1 /** 2 * Copyright: Copyright Jason White, 2014-2016 3 * License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 * Authors: Jason White 5 * 6 * Description: 7 * Buffers a stream using a fixed-size buffer. 8 */ 9 module io.buffer.fixed; 10 11 import io.stream.types; 12 import io.stream.traits; 13 import io.buffer.traits; 14 15 struct FixedBufferBase(Stream) 16 if (is(Stream == struct) && isBufferable!Stream) 17 { 18 Stream stream; 19 20 alias stream this; 21 22 // Buffer to store the data to be read or written. 23 private ubyte[] _buffer; 24 25 // Current read/write position in the buffer. For writes, this is >0 if data 26 // has been written to the buffer but not flushed. 27 private size_t _position; 28 29 @disable this(this); 30 31 /** 32 * Forwards arguments to super class. 33 */ 34 this(T...)(auto ref T args) 35 { 36 import std.functional : forward; 37 stream = Stream(forward!args); 38 _buffer.length = 8192; 39 } 40 41 /** 42 * Upon destruction, any pending writes are flushed to the underlying 43 * stream. 44 */ 45 ~this() 46 { 47 static if (isSink!Stream) 48 flush(); 49 } 50 51 /** 52 * Sets the size of the buffer. The default is 8192 bytes. This will only 53 * succeed if no data has been buffered (e.g., just after construction). 54 */ 55 @property void bufferSize(size_t size) 56 { 57 if (_position > 0) return; 58 59 static if (isSource!Stream) 60 { 61 if (_valid > 0) return; 62 } 63 64 _buffer.length = size; 65 } 66 67 /** 68 * Gets the current buffer size. The default is 8192 bytes. 69 */ 70 @property size_t bufferSize() const pure nothrow 71 { 72 return _buffer.length; 73 } 74 75 static if (isSource!Stream) 76 { 77 // Last valid position in the buffer. This is 0 if no read data is 78 // sitting in the buffer. 79 private size_t _valid; 80 81 /** 82 * Initiates a read. This handles flushing any data previously written. 83 */ 84 static if (isSink!Stream) 85 { 86 private void beginRead() 87 { 88 flush(); 89 } 90 } 91 else 92 { 93 // Nothing to do, this should be optimized away. 94 private void beginRead() {} 95 } 96 97 private size_t readPartial(scope ubyte[] buf) 98 { 99 import std.algorithm : min; 100 101 // Satisfy what can be copied so far from the buffer. 102 immutable satisfiable = min(_valid - _position, buf.length); 103 buf[0 .. satisfiable] = _buffer[_position .. _position + satisfiable]; 104 _position += satisfiable; 105 106 return satisfiable; 107 } 108 109 /** 110 * Reads data from the stream into the given buffer. The number of bytes 111 * read is returned. 112 */ 113 size_t read(scope ubyte[] buf) 114 { 115 beginRead(); 116 117 immutable satisfied = readPartial(buf); 118 if (satisfied == buf.length) 119 return satisfied; 120 121 buf = buf[satisfied .. $]; 122 123 // Large read? Get it directly from the stream. 124 if (buf.length >= _buffer.length) 125 return satisfied + stream.read(buf); 126 127 // Buffer is empty, fill it back up. 128 immutable bytesRead = stream.read(_buffer); 129 _position = 0; 130 _valid = bytesRead; 131 132 // Finish the copy 133 return satisfied + readPartial(buf); 134 } 135 } 136 137 static if (isSink!Stream) 138 { 139 /** 140 * Initiates a write. This will handle seeking to the correct position 141 * due to a previous read. 142 */ 143 static if (isSource!Stream) 144 { 145 private void beginWrite() 146 { 147 if (_valid == 0) return; 148 149 // The length of the window indicates how much data hasn't 150 // "really" been read from the stream. Just seek backwards that 151 // distance. 152 stream.seekTo(_position - _valid, From.here); 153 _position = _valid = 0; 154 } 155 } 156 else 157 { 158 // Nothing to do. This should be optimized away. 159 private void beginWrite() {} 160 } 161 162 /* 163 * Copies as much as possible to the stream buffer. The number of bytes 164 * copied is returned. 165 */ 166 private size_t writePartial(in ubyte[] buf) 167 { 168 import std.algorithm : min; 169 170 immutable satisfiable = min(_buffer.length - _position, buf.length); 171 _buffer[_position .. _position + satisfiable] = buf[0 .. satisfiable]; 172 _position += satisfiable; 173 174 return satisfiable; 175 } 176 177 /** 178 * Writes the given data to the buffered stream. When the internal 179 * buffer is completely filled, it is flushed to the underlying stream. 180 */ 181 size_t write(in ubyte[] buf) 182 { 183 beginWrite(); 184 185 immutable satisfied = writePartial(buf); 186 if (satisfied == buf.length) 187 return satisfied; 188 189 const(ubyte)[] leftOver = buf[satisfied .. $]; 190 191 // Buffer is full and there is more to write. Flush it. 192 flush(); 193 194 // Large write? Push it directly to the stream. 195 if (leftOver.length >= _buffer.length) 196 return satisfied + stream.write(leftOver); 197 198 // Write the rest. 199 return satisfied + writePartial(leftOver); 200 } 201 202 alias put = write; 203 204 /** 205 * Writes any pending data to the underlying stream. 206 */ 207 void flush() 208 { 209 static if (isSource!Stream) 210 { 211 if (_valid > 0) 212 return; 213 } 214 215 if (_position > 0) 216 { 217 immutable bytesWritten = stream.write(_buffer[0 .. _position]); 218 assert(bytesWritten == _position); 219 _position = 0; 220 } 221 } 222 } 223 224 static if (isSeekable!Stream) 225 { 226 /** 227 * Seeks to the given position relative to the given starting point. 228 */ 229 long seekTo(long offset, From from = From.start) 230 { 231 static if (isSource!Stream) 232 { 233 if (_valid > 0) 234 { 235 if (from == From.here) 236 { 237 // Can we seek within the buffer? 238 // FIXME: Handle potential integer overflow. 239 if (_position + offset < _valid) 240 { 241 _position += offset; 242 return stream.seekTo(0, From.here) + (_position - _valid); 243 } 244 } 245 246 // Invalidate the window 247 _position = _valid = 0; 248 } 249 } 250 251 static if (isSink!Stream) 252 { 253 flush(); 254 } 255 256 return stream.seekTo(offset, from); 257 } 258 } 259 } 260 261 import std.typecons : RefCounted, RefCountedAutoInitialize; 262 alias FixedBuffer(Stream) = RefCounted!(FixedBufferBase!(Stream), RefCountedAutoInitialize.no);