1 /** 2 * Copyright: Copyright Jason White, 2014-2016 3 * License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 * Authors: Jason White 5 * 6 * Description: 7 * This module provides _range interfaces for streams. This is useful for using 8 * many of the _range operations in $(D std._range) and $(D std.algorithm). 9 * 10 * There is an important distinction between streams and ranges to be made. 11 * Fundamentally, a stream is a unidirectional $(I stream) of bytes. That is, 12 * there is no going backwards and there is no saving the current position (as 13 * bidirectional and forward ranges can do). This provides a good mapping to 14 * input ranges and output ranges. As streams only operate on raw bytes, ranges 15 * provide an abstraction to operate on more complex data types. 16 */ 17 module io.range; 18 19 import io.stream; 20 21 22 /** 23 * Range that reads up to a fixed size chunk of data from a stream at a time. 24 */ 25 struct ByChunk(Stream) 26 if (isSource!Stream) 27 { 28 private 29 { 30 Stream _source; 31 32 // Buffer to read in the data into. 33 ubyte[] _buffer; 34 35 // Length of valid data in the buffer 36 size_t _valid; 37 } 38 39 /** 40 * Initializes the range. A byte buffer with the given size is allocated to 41 * hold the chunks. 42 * 43 * Params: 44 * source = A stream that can be read from. 45 * size = The size of each chunk to read at a time. 46 */ 47 this(Stream source, size_t size = 4096) 48 { 49 this(source, new ubyte[](size)); 50 } 51 52 /** 53 * Initializes the range with the specified buffer. This is useful for 54 * providing your own buffer that may be stack allocated. 55 * 56 * Params: 57 * source = A stream that can be read from. 58 * buffer = A byte array to hold each chunk as it is read. 59 */ 60 this(Stream source, ubyte[] buffer) 61 { 62 _source = source; 63 _buffer = buffer; 64 popFront(); 65 } 66 67 /** 68 * Reads the next chunk from the stream. 69 */ 70 void popFront() 71 { 72 _valid = _source.read(_buffer); 73 } 74 75 /** 76 * Returns: The current chunk of the stream. 77 * 78 * Note that a full chunk is not guaranteed to be returned. In the event of 79 * a partial read from the stream, this will be less than the maximum chunk 80 * size. Code should be impartial to the size of the returned chunk. 81 */ 82 const(ubyte)[] front() const pure 83 { 84 return _buffer[0 .. _valid]; 85 } 86 87 /** 88 * Returns true if there are no more chunks to be read from the stream. 89 */ 90 bool empty() const pure nothrow 91 { 92 return _valid == 0; 93 } 94 } 95 96 /** 97 * Convenience function for creating a $(D ByChunk) range over a stream. 98 * 99 * Example: 100 * --- 101 * import std.digest.digest : digest; 102 * import std.digest.sha : SHA1; 103 * import io.file; 104 * 105 * // Hash a file, 4KiB chunks at a time 106 * ubyte[4096] buf; 107 * auto sha1 = digest!SHA1(File("foo").byChunk(buf)); 108 * --- 109 */ 110 auto byChunk(Stream)(Stream stream, size_t size = 4096) 111 if (isSource!Stream) 112 { 113 return ByChunk!Stream(stream, size); 114 } 115 116 /// Ditto 117 auto byChunk(Stream)(Stream stream, ubyte[] buffer) 118 if (isSource!Stream) 119 { 120 return ByChunk!Stream(stream, buffer); 121 } 122 123 unittest 124 { 125 import io.file.temp; 126 import std.algorithm : equal; 127 import std.array : join; 128 129 immutable chunkSize = 4; 130 immutable chunks = ["1234", "5678", "abcd", "efgh", "ij"]; 131 immutable data = chunks.join(); 132 133 auto f = tempFile.file; 134 f.write(data); 135 f.position = 0; 136 137 assert(f.byChunk(4).equal(chunks)); 138 } 139 140 /** 141 * Wraps a stream in a range interface such that blocks of a fixed size are read 142 * from the source. It is assumed that the stream is buffered so that 143 * performance is not adversely affected. 144 * 145 * Since streams and ranges are fundamentally different, this is useful for 146 * performing range operations on streams. 147 * 148 * Note: This is an input range and cannot be saved with $(D save()). Thus, 149 * usage of this should not be mixed with the underlying stream without first 150 * seeking to a specific location in the stream. 151 */ 152 struct ByBlock(T, Stream) 153 if (isSource!Stream) 154 { 155 private 156 { 157 Stream _source; 158 159 // The current block in the stream. 160 T _current; 161 162 // Are we there yet? 163 bool _empty = false; 164 } 165 166 /** 167 * Initializes the range with a source stream. 168 */ 169 this(Stream source) 170 { 171 _source = source; 172 173 // Prime the cannons. 174 popFront(); 175 } 176 177 /** 178 * Removes one block from the stream. 179 */ 180 void popFront() 181 { 182 try 183 { 184 _source.readExactly((cast(ubyte*)&_current)[0 .. T.sizeof]); 185 } 186 catch (ReadException e) 187 { 188 _empty = true; 189 } 190 } 191 192 /** 193 * Returns: The current block in the stream. 194 */ 195 @property ref const(T) front() const pure nothrow 196 { 197 return _current; 198 } 199 200 /** 201 * The range is considered empty when less than $(D T.sizeof) bytes can be 202 * read from the stream. 203 * 204 * Returns: True if there are no more blocks in the stream and false 205 * otherwise. 206 */ 207 @property bool empty() const pure nothrow 208 { 209 return _empty; 210 } 211 } 212 213 /** 214 * Helper function for constructing a block range. 215 * 216 * Example: 217 * --- 218 * import std.algorithm : equal; 219 * import std.range : take; 220 * --- 221 */ 222 @property auto byBlock(T, Stream)(Stream stream) 223 if (isSource!Stream) 224 { 225 return ByBlock!(T, Stream)(stream); 226 } 227 228 unittest 229 { 230 import io.file.temp; 231 import std.algorithm : equal; 232 233 static struct Data 234 { 235 int a, b, c; 236 } 237 238 immutable Data[] data = [ 239 {1, 2, 3}, 240 {4, 5, 6}, 241 {7, 8, 9}, 242 ]; 243 244 // Write some data to the file 245 auto f = tempFile.file; 246 f.put(data); 247 f.position = 0; 248 249 // Read it back in, block-by-block 250 assert(f.byBlock!Data.equal(data)); 251 assert(f.byBlock!Data.empty); 252 } 253 254 import std.range : back, isBidirectionalRange; 255 256 /** 257 * Checks if the given region ends with the given separator. 258 * 259 * Returns: The number of elements that match. 260 */ 261 size_t endsWithSeparator(T, Separator) 262 (const(T)[] region, const Separator separator) 263 if (is(typeof(T.init == Separator.init) : bool)) 264 { 265 import std.range : back; 266 return region.back == separator; 267 } 268 269 /// Ditto 270 size_t endsWithSeparator(T, Separator) 271 (const(T)[] region, Separator sep) 272 if (isBidirectionalRange!Separator && 273 is(typeof(T.init == sep.back.init) : bool)) 274 { 275 import std.range : back, empty, popBack; 276 277 size_t common = 0; 278 279 while (!region.empty && !sep.empty && region.back == sep.back) 280 { 281 region.popBack(); 282 sep.popBack(); 283 284 ++common; 285 } 286 287 return sep.empty ? common : 0; 288 } 289 290 /** 291 * Checks if the given function can be used with $(D Splitter). 292 */ 293 enum isSplitFunction(alias fn, T, Separator) = 294 is(typeof(fn(T[].init, Separator.init))); 295 296 /// 297 unittest 298 { 299 static assert(isSplitFunction!(endsWithSeparator, char, char)); 300 static assert(isSplitFunction!(endsWithSeparator, char, string)); 301 } 302 303 /** 304 * Splits a stream using a separator. The separator can be a single element or a 305 * bidirectional range of elements. 306 */ 307 struct Splitter(T, Separator, Stream, alias splitFn = endsWithSeparator!(T, Separator)) 308 if (isSource!Stream && isSplitFunction!(splitFn, T, Separator)) 309 { 310 private 311 { 312 import std.array : Appender; 313 314 alias Region = Appender!(T[]); 315 316 // The current region 317 Region _region; 318 319 // Block iterator 320 ByBlock!(T, Stream) _blocks; 321 322 bool _empty = false; 323 324 // Element or range that separates regions. 325 const Separator _separator; 326 } 327 328 this(Stream source, const Separator separator) 329 { 330 _blocks = source.byBlock!(T, Stream); 331 _separator = separator; 332 333 // Prime the cannons 334 popFront(); 335 } 336 337 void popFront() 338 { 339 version (assert) 340 { 341 import core.exception : RangeError; 342 if (empty) throw new RangeError(); 343 } 344 345 _region.clear(); 346 347 if (_blocks.empty) 348 { 349 _empty = true; 350 return; 351 } 352 353 while (!_blocks.empty) 354 { 355 _region.put(_blocks.front); 356 357 if (auto len = splitFn(_region.data, _separator)) 358 { 359 assert(_region.data.length >= len); 360 _region.shrinkTo(_region.data.length - len); 361 break; 362 } 363 364 _blocks.popFront(); 365 } 366 367 _blocks.popFront(); 368 } 369 370 /** 371 * Gets the current region in the stream. 372 */ 373 const(T)[] front() const pure nothrow 374 { 375 version (assert) 376 { 377 import core.exception : RangeError; 378 if (empty) throw new RangeError(); 379 } 380 381 return _region.data; 382 } 383 384 /** 385 * Returns true if there are no more regions in the splitter. 386 */ 387 bool empty() const pure nothrow 388 { 389 return _empty; 390 } 391 } 392 393 unittest 394 { 395 import std.range.primitives; 396 import io.file : File; 397 398 alias S = File; 399 400 static assert(isInputRange!(Splitter!(char, char, S))); 401 static assert(isInputRange!(Splitter!(char, string, S))); 402 static assert(isInputRange!(Splitter!(int, int, S))); 403 static assert(isInputRange!(Splitter!(int, immutable(int)[], S))); 404 405 static assert(!isOutputRange!(Splitter!(char, char, S), char)); 406 static assert(!isForwardRange!(Splitter!(char, char, S))); 407 static assert(!isBidirectionalRange!(Splitter!(char, char, S))); 408 static assert(!isRandomAccessRange!(Splitter!(char, char, S))); 409 } 410 411 /** 412 * Convenience function for returning a stream splitter. 413 * 414 * Params: 415 * T = Type of each element in the stream. 416 * stream = A sink stream that can be read from. 417 * separator = An element or range of elements to split on. 418 * 419 * Example: 420 * --- 421 * // Get a list of words from standard input. 422 * import io; 423 * import std.algorithm : map, filter; 424 * import std.array : array; 425 * auto words = stdin.splitter!char(' ') 426 * .filter!(w => w != "") 427 * .map!(w => w.idup) 428 * .array; 429 * --- 430 */ 431 auto splitter(T = char, Separator, Stream)(Stream stream, Separator separator) 432 if (isSource!Stream) 433 { 434 return Splitter!(T, Separator, Stream)(stream, separator); 435 } 436 437 unittest 438 { 439 // Test an empty split 440 import io.file.temp; 441 import std.algorithm : equal; 442 assert(tempFile.file.splitter!char('\n').equal(string[].init)); 443 } 444 445 version (unittest) 446 { 447 void testSplitter(T, Separator)(const T[][] regions, Separator separator) 448 { 449 import io.file.temp; 450 import std.array : join; 451 import std.algorithm : equal; 452 import std.traits : isArray; 453 454 static if (isArray!Separator) 455 auto joined = regions.join(separator); 456 else 457 auto joined = regions.join([separator]); 458 459 auto f = tempFile.file; 460 f.writeExactly(joined); 461 f.position = 0; 462 463 assert(f.splitter!T(separator).equal(regions)); 464 assert(f.position == joined.length * T.sizeof); 465 466 // Add a trailing separator at the end of the file 467 static if (isArray!Separator) 468 f.writeExactly(separator); 469 else 470 f.writeExactly([separator]); 471 f.position = 0; 472 assert(f.splitter!T(separator).equal(regions)); 473 } 474 } 475 476 unittest 477 { 478 // Test different types of separators. 479 immutable lines = [ 480 "This is the first line", 481 "", 482 "That was a blank line.", 483 "This is the penultimate line!", 484 "This is the last line.", 485 ]; 486 487 testSplitter(lines, '\n'); 488 testSplitter(lines, "\n"); 489 testSplitter(lines, "\r\n"); 490 testSplitter(lines, "\n\n"); 491 testSplitter(lines, "||||"); 492 } 493 494 unittest 495 { 496 // Test combining with filter 497 import io.file.temp; 498 import std.algorithm : filter, equal; 499 500 immutable data = "The quick brown fox jumps over the lazy dog "; 501 immutable result = [ 502 "The", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog" 503 ]; 504 505 auto f = tempFile.file; 506 f.writeExactly(data); 507 f.position = 0; 508 509 assert(f.splitter!char(' ').filter!(w => w != "").equal(result)); 510 }