1 /**
2  * Copyright: Copyright Jason White, 2014-2016
3  * License:   $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4  * Authors:   Jason White
5  *
6  * Description:
7  * This module provides _range interfaces for streams. This is useful for using
8  * many of the _range operations in $(D std._range) and $(D std.algorithm).
9  *
10  * There is an important distinction between streams and ranges to be made.
11  * Fundamentally, a stream is a unidirectional $(I stream) of bytes. That is,
12  * there is no going backwards and there is no saving the current position (as
13  * bidirectional and forward ranges can do). This provides a good mapping to
14  * input ranges and output ranges. As streams only operate on raw bytes, ranges
15  * provide an abstraction to operate on more complex data types.
16  */
17 module io.range;
18 
19 import io.stream;
20 
21 
22 /**
23  * Range that reads up to a fixed size chunk of data from a stream at a time.
24  */
25 struct ByChunk(Stream)
26     if (isSource!Stream)
27 {
28     private
29     {
30         Stream _source;
31 
32         // Buffer to read in the data into.
33         ubyte[] _buffer;
34 
35         // Length of valid data in the buffer
36         size_t _valid;
37     }
38 
39     /**
40      * Initializes the range. A byte buffer with the given size is allocated to
41      * hold the chunks.
42      *
43      * Params:
44      *   source = A stream that can be read from.
45      *   size = The size of each chunk to read at a time.
46      */
47     this(Stream source, size_t size = 4096)
48     {
49         this(source, new ubyte[](size));
50     }
51 
52     /**
53      * Initializes the range with the specified buffer. This is useful for
54      * providing your own buffer that may be stack allocated.
55      *
56      * Params:
57      *   source = A stream that can be read from.
58      *   buffer = A byte array to hold each chunk as it is read.
59      */
60     this(Stream source, ubyte[] buffer)
61     {
62         _source = source;
63         _buffer = buffer;
64         popFront();
65     }
66 
67     /**
68      * Reads the next chunk from the stream.
69      */
70     void popFront()
71     {
72         _valid = _source.read(_buffer);
73     }
74 
75     /**
76      * Returns: The current chunk of the stream.
77      *
78      * Note that a full chunk is not guaranteed to be returned. In the event of
79      * a partial read from the stream, this will be less than the maximum chunk
80      * size. Code should be impartial to the size of the returned chunk.
81      */
82     const(ubyte)[] front() const pure
83     {
84         return _buffer[0 .. _valid];
85     }
86 
87     /**
88      * Returns true if there are no more chunks to be read from the stream.
89      */
90     bool empty() const pure nothrow
91     {
92         return _valid == 0;
93     }
94 }
95 
96 /**
97  * Convenience function for creating a $(D ByChunk) range over a stream.
98  *
99  * Example:
100  * ---
101  * import std.digest.digest : digest;
102  * import std.digest.sha : SHA1;
103  * import io.file;
104  *
105  * // Hash a file, 4KiB chunks at a time
106  * ubyte[4096] buf;
107  * auto sha1 = digest!SHA1(File("foo").byChunk(buf));
108  * ---
109  */
110 auto byChunk(Stream)(Stream stream, size_t size = 4096)
111     if (isSource!Stream)
112 {
113     return ByChunk!Stream(stream, size);
114 }
115 
116 /// Ditto
117 auto byChunk(Stream)(Stream stream, ubyte[] buffer)
118     if (isSource!Stream)
119 {
120     return ByChunk!Stream(stream, buffer);
121 }
122 
123 unittest
124 {
125     import io.file.temp;
126     import std.algorithm : equal;
127     import std.array : join;
128 
129     immutable chunkSize = 4;
130     immutable chunks = ["1234", "5678", "abcd", "efgh", "ij"];
131     immutable data = chunks.join();
132 
133     auto f = tempFile.file;
134     f.write(data);
135     f.position = 0;
136 
137     assert(f.byChunk(4).equal(chunks));
138 }
139 
140 /**
141  * Wraps a stream in a range interface such that blocks of a fixed size are read
142  * from the source. It is assumed that the stream is buffered so that
143  * performance is not adversely affected.
144  *
145  * Since streams and ranges are fundamentally different, this is useful for
146  * performing range operations on streams.
147  *
148  * Note: This is an input range and cannot be saved with $(D save()). Thus,
149  * usage of this should not be mixed with the underlying stream without first
150  * seeking to a specific location in the stream.
151  */
152 struct ByBlock(T, Stream)
153     if (isSource!Stream)
154 {
155     private
156     {
157         Stream _source;
158 
159         // The current block in the stream.
160         T _current;
161 
162         // Are we there yet?
163         bool _empty = false;
164     }
165 
166     /**
167      * Initializes the range with a source stream.
168      */
169     this(Stream source)
170     {
171         _source = source;
172 
173         // Prime the cannons.
174         popFront();
175     }
176 
177     /**
178      * Removes one block from the stream.
179      */
180     void popFront()
181     {
182         try
183         {
184             _source.readExactly((cast(ubyte*)&_current)[0 .. T.sizeof]);
185         }
186         catch (ReadException e)
187         {
188             _empty = true;
189         }
190     }
191 
192     /**
193      * Returns: The current block in the stream.
194      */
195     @property ref const(T) front() const pure nothrow
196     {
197         return _current;
198     }
199 
200     /**
201      * The range is considered empty when less than $(D T.sizeof) bytes can be
202      * read from the stream.
203      *
204      * Returns: True if there are no more blocks in the stream and false
205      * otherwise.
206      */
207     @property bool empty() const pure nothrow
208     {
209         return _empty;
210     }
211 }
212 
213 /**
214  * Helper function for constructing a block range.
215  *
216  * Example:
217  * ---
218  * import std.algorithm : equal;
219  * import std.range : take;
220  * ---
221  */
222 @property auto byBlock(T, Stream)(Stream stream)
223     if (isSource!Stream)
224 {
225     return ByBlock!(T, Stream)(stream);
226 }
227 
228 unittest
229 {
230     import io.file.temp;
231     import std.algorithm : equal;
232 
233     static struct Data
234     {
235         int a, b, c;
236     }
237 
238     immutable Data[] data = [
239         {1, 2, 3},
240         {4, 5, 6},
241         {7, 8, 9},
242     ];
243 
244     // Write some data to the file
245     auto f = tempFile.file;
246     f.put(data);
247     f.position = 0;
248 
249     // Read it back in, block-by-block
250     assert(f.byBlock!Data.equal(data));
251     assert(f.byBlock!Data.empty);
252 }
253 
254 import std.range : back, isBidirectionalRange;
255 
256 /**
257  * Checks if the given region ends with the given separator.
258  *
259  * Returns: The number of elements that match.
260  */
261 size_t endsWithSeparator(T, Separator)
262     (const(T)[] region, const Separator separator)
263     if (is(typeof(T.init == Separator.init) : bool))
264 {
265     import std.range : back;
266     return region.back == separator;
267 }
268 
269 /// Ditto
270 size_t endsWithSeparator(T, Separator)
271     (const(T)[] region, Separator sep)
272     if (isBidirectionalRange!Separator &&
273         is(typeof(T.init == sep.back.init) : bool))
274 {
275     import std.range : back, empty, popBack;
276 
277     size_t common = 0;
278 
279     while (!region.empty && !sep.empty && region.back == sep.back)
280     {
281         region.popBack();
282         sep.popBack();
283 
284         ++common;
285     }
286 
287     return sep.empty ? common : 0;
288 }
289 
290 /**
291  * Checks if the given function can be used with $(D Splitter).
292  */
293 enum isSplitFunction(alias fn, T, Separator) =
294     is(typeof(fn(T[].init, Separator.init)));
295 
296 ///
297 unittest
298 {
299     static assert(isSplitFunction!(endsWithSeparator, char, char));
300     static assert(isSplitFunction!(endsWithSeparator, char, string));
301 }
302 
303 /**
304  * Splits a stream using a separator. The separator can be a single element or a
305  * bidirectional range of elements.
306  */
307 struct Splitter(T, Separator, Stream, alias splitFn = endsWithSeparator!(T, Separator))
308     if (isSource!Stream && isSplitFunction!(splitFn, T, Separator))
309 {
310     private
311     {
312         import std.array : Appender;
313 
314         alias Region = Appender!(T[]);
315 
316         // The current region
317         Region _region;
318 
319         // Block iterator
320         ByBlock!(T, Stream) _blocks;
321 
322         bool _empty = false;
323 
324         // Element or range that separates regions.
325         const Separator _separator;
326     }
327 
328     this(Stream source, const Separator separator)
329     {
330         _blocks = source.byBlock!(T, Stream);
331         _separator = separator;
332 
333         // Prime the cannons
334         popFront();
335     }
336 
337     void popFront()
338     {
339         version (assert)
340         {
341             import core.exception : RangeError;
342             if (empty) throw new RangeError();
343         }
344 
345         _region.clear();
346 
347         if (_blocks.empty)
348         {
349             _empty = true;
350             return;
351         }
352 
353         while (!_blocks.empty)
354         {
355             _region.put(_blocks.front);
356 
357             if (auto len = splitFn(_region.data, _separator))
358             {
359                 assert(_region.data.length >= len);
360                 _region.shrinkTo(_region.data.length - len);
361                 break;
362             }
363 
364             _blocks.popFront();
365         }
366 
367         _blocks.popFront();
368     }
369 
370     /**
371      * Gets the current region in the stream.
372      */
373     const(T)[] front() const pure nothrow
374     {
375         version (assert)
376         {
377             import core.exception : RangeError;
378             if (empty) throw new RangeError();
379         }
380 
381         return _region.data;
382     }
383 
384     /**
385      * Returns true if there are no more regions in the splitter.
386      */
387     bool empty() const pure nothrow
388     {
389         return _empty;
390     }
391 }
392 
393 unittest
394 {
395     import std.range.primitives;
396     import io.file : File;
397 
398     alias S = File;
399 
400     static assert(isInputRange!(Splitter!(char, char, S)));
401     static assert(isInputRange!(Splitter!(char, string, S)));
402     static assert(isInputRange!(Splitter!(int, int, S)));
403     static assert(isInputRange!(Splitter!(int, immutable(int)[], S)));
404 
405     static assert(!isOutputRange!(Splitter!(char, char, S), char));
406     static assert(!isForwardRange!(Splitter!(char, char, S)));
407     static assert(!isBidirectionalRange!(Splitter!(char, char, S)));
408     static assert(!isRandomAccessRange!(Splitter!(char, char, S)));
409 }
410 
411 /**
412  * Convenience function for returning a stream splitter.
413  *
414  * Params:
415  *   T = Type of each element in the stream.
416  *   stream = A sink stream that can be read from.
417  *   separator = An element or range of elements to split on.
418  *
419  * Example:
420  * ---
421  * // Get a list of words from standard input.
422  * import io;
423  * import std.algorithm : map, filter;
424  * import std.array : array;
425  * auto words = stdin.splitter!char(' ')
426  *                   .filter!(w => w != "")
427  *                   .map!(w => w.idup)
428  *                   .array;
429  * ---
430  */
431 auto splitter(T = char, Separator, Stream)(Stream stream, Separator separator)
432     if (isSource!Stream)
433 {
434     return Splitter!(T, Separator, Stream)(stream, separator);
435 }
436 
437 unittest
438 {
439     // Test an empty split
440     import io.file.temp;
441     import std.algorithm : equal;
442     assert(tempFile.file.splitter!char('\n').equal(string[].init));
443 }
444 
445 version (unittest)
446 {
447     void testSplitter(T, Separator)(const T[][] regions, Separator separator)
448     {
449         import io.file.temp;
450         import std.array : join;
451         import std.algorithm : equal;
452         import std.traits : isArray;
453 
454         static if (isArray!Separator)
455             auto joined = regions.join(separator);
456         else
457             auto joined = regions.join([separator]);
458 
459         auto f = tempFile.file;
460         f.writeExactly(joined);
461         f.position = 0;
462 
463         assert(f.splitter!T(separator).equal(regions));
464         assert(f.position == joined.length * T.sizeof);
465 
466         // Add a trailing separator at the end of the file
467         static if (isArray!Separator)
468             f.writeExactly(separator);
469         else
470             f.writeExactly([separator]);
471         f.position = 0;
472         assert(f.splitter!T(separator).equal(regions));
473     }
474 }
475 
476 unittest
477 {
478     // Test different types of separators.
479     immutable lines = [
480         "This is the first line",
481         "",
482         "That was a blank line.",
483         "This is the penultimate line!",
484         "This is the last line.",
485     ];
486 
487     testSplitter(lines, '\n');
488     testSplitter(lines, "\n");
489     testSplitter(lines, "\r\n");
490     testSplitter(lines, "\n\n");
491     testSplitter(lines, "||||");
492 }
493 
494 unittest
495 {
496     // Test combining with filter
497     import io.file.temp;
498     import std.algorithm : filter, equal;
499 
500     immutable data = "The    quick brown  fox jumps over the lazy dog    ";
501     immutable result = [
502         "The", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"
503     ];
504 
505     auto f = tempFile.file;
506     f.writeExactly(data);
507     f.position = 0;
508 
509     assert(f.splitter!char(' ').filter!(w => w != "").equal(result));
510 }