1 /**
2  * Copyright: Copyright Jason White, 2014-2016
3  * License:   $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4  * Authors:   Jason White
5  *
6  * Description:
7  * Buffers a stream using a fixed-size buffer.
8  */
9 module io.buffer.fixed;
10 
11 import io.stream.types;
12 import io.stream.traits;
13 import io.buffer.traits;
14 
15 struct FixedBufferBase(Stream)
16     if (is(Stream == struct) && isBufferable!Stream)
17 {
18     Stream stream;
19 
20     alias stream this;
21 
22     // Buffer to store the data to be read or written.
23     private ubyte[] _buffer;
24 
25     // Current read/write position in the buffer. For writes, this is >0 if data
26     // has been written to the buffer but not flushed.
27     private size_t _position;
28 
29     @disable this(this);
30 
31     /**
32      * Forwards arguments to super class.
33      */
34     this(T...)(auto ref T args)
35     {
36         import std.functional : forward;
37         stream = Stream(forward!args);
38         _buffer.length = 8192;
39     }
40 
41     /**
42      * Upon destruction, any pending writes are flushed to the underlying
43      * stream.
44      */
45     ~this()
46     {
47         static if (isSink!Stream)
48             flush();
49     }
50 
51     /**
52      * Sets the size of the buffer. The default is 8192 bytes. This will only
53      * succeed if no data has been buffered (e.g., just after construction).
54      */
55     @property void bufferSize(size_t size)
56     {
57         if (_position > 0) return;
58 
59         static if (isSource!Stream)
60         {
61             if (_valid > 0) return;
62         }
63 
64         _buffer.length = size;
65     }
66 
67     /**
68      * Gets the current buffer size. The default is 8192 bytes.
69      */
70     @property size_t bufferSize() const pure nothrow
71     {
72         return _buffer.length;
73     }
74 
75     static if (isSource!Stream)
76     {
77         // Last valid position in the buffer. This is 0 if no read data is
78         // sitting in the buffer.
79         private size_t _valid;
80 
81         /**
82          * Initiates a read. This handles flushing any data previously written.
83          */
84         static if (isSink!Stream)
85         {
86             private void beginRead()
87             {
88                 flush();
89             }
90         }
91         else
92         {
93             // Nothing to do, this should be optimized away.
94             private void beginRead() {}
95         }
96 
97         private size_t readPartial(scope ubyte[] buf)
98         {
99             import std.algorithm : min;
100 
101             // Satisfy what can be copied so far from the buffer.
102             immutable satisfiable = min(_valid - _position, buf.length);
103             buf[0 .. satisfiable] = _buffer[_position .. _position + satisfiable];
104             _position += satisfiable;
105 
106             return satisfiable;
107         }
108 
109         /**
110          * Reads data from the stream into the given buffer. The number of bytes
111          * read is returned.
112          */
113         size_t read(scope ubyte[] buf)
114         {
115             beginRead();
116 
117             immutable satisfied = readPartial(buf);
118             if (satisfied == buf.length)
119                 return satisfied;
120 
121             buf = buf[satisfied .. $];
122 
123             // Large read? Get it directly from the stream.
124             if (buf.length >= _buffer.length)
125                 return satisfied + stream.read(buf);
126 
127             // Buffer is empty, fill it back up.
128             immutable bytesRead = stream.read(_buffer);
129             _position = 0;
130             _valid = bytesRead;
131 
132             // Finish the copy
133             return satisfied + readPartial(buf);
134         }
135     }
136 
137     static if (isSink!Stream)
138     {
139         /**
140          * Initiates a write. This will handle seeking to the correct position
141          * due to a previous read.
142          */
143         static if (isSource!Stream)
144         {
145             private void beginWrite()
146             {
147                 if (_valid == 0) return;
148 
149                 // The length of the window indicates how much data hasn't
150                 // "really" been read from the stream. Just seek backwards that
151                 // distance.
152                 stream.seekTo(_position - _valid, From.here);
153                 _position = _valid = 0;
154             }
155         }
156         else
157         {
158             // Nothing to do. This should be optimized away.
159             private void beginWrite() {}
160         }
161 
162         /*
163          * Copies as much as possible to the stream buffer. The number of bytes
164          * copied is returned.
165          */
166         private size_t writePartial(in ubyte[] buf)
167         {
168             import std.algorithm : min;
169 
170             immutable satisfiable = min(_buffer.length - _position, buf.length);
171             _buffer[_position .. _position + satisfiable] = buf[0 .. satisfiable];
172             _position += satisfiable;
173 
174             return satisfiable;
175         }
176 
177         /**
178          * Writes the given data to the buffered stream. When the internal
179          * buffer is completely filled, it is flushed to the underlying stream.
180          */
181         size_t write(in ubyte[] buf)
182         {
183             beginWrite();
184 
185             immutable satisfied = writePartial(buf);
186             if (satisfied == buf.length)
187                 return satisfied;
188 
189             const(ubyte)[] leftOver = buf[satisfied .. $];
190 
191             // Buffer is full and there is more to write. Flush it.
192             flush();
193 
194             // Large write? Push it directly to the stream.
195             if (leftOver.length >= _buffer.length)
196                 return satisfied + stream.write(leftOver);
197 
198             // Write the rest.
199             return satisfied + writePartial(leftOver);
200         }
201 
202         alias put = write;
203 
204         /**
205          * Writes any pending data to the underlying stream.
206          */
207         void flush()
208         {
209             static if (isSource!Stream)
210             {
211                 if (_valid > 0)
212                     return;
213             }
214 
215             if (_position > 0)
216             {
217                 immutable bytesWritten = stream.write(_buffer[0 .. _position]);
218                 assert(bytesWritten == _position);
219                 _position = 0;
220             }
221         }
222     }
223 
224     static if (isSeekable!Stream)
225     {
226         /**
227          * Seeks to the given position relative to the given starting point.
228          */
229         long seekTo(long offset, From from = From.start)
230         {
231             static if (isSource!Stream)
232             {
233                 if (_valid > 0)
234                 {
235                     if (from == From.here)
236                     {
237                         // Can we seek within the buffer?
238                         // FIXME: Handle potential integer overflow.
239                         if (_position + offset < _valid)
240                         {
241                             _position += offset;
242                             return stream.seekTo(0, From.here) + (_position - _valid);
243                         }
244                     }
245 
246                     // Invalidate the window
247                     _position = _valid = 0;
248                 }
249             }
250 
251             static if (isSink!Stream)
252             {
253                 flush();
254             }
255 
256             return stream.seekTo(offset, from);
257         }
258     }
259 }
260 
261 import std.typecons : RefCounted, RefCountedAutoInitialize;
262 alias FixedBuffer(Stream) = RefCounted!(FixedBufferBase!(Stream), RefCountedAutoInitialize.no);