-
-
Save sam0x17/561de58e69dbe979ba4b1c5042b6c153 to your computer and use it in GitHub Desktop.
D implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module streams; | |
/// represents an object or stream that can be written to | |
/// via a write method, as used by our Stream class | |
interface Writable { | |
/// writes data to the underlying stream or interface | |
void write(const ubyte[] data); | |
} | |
/// Encapsulates a stream of data. Can act as an input, an output, or both | |
/// depending on what methods are defined. | |
abstract class Stream : Writable { | |
protected ulong remainingBytes = 0; | |
protected ulong totalBytesIn = 0; | |
protected ulong totalBytesOut = 0; | |
protected ulong limit = 0; | |
protected bool[Writable] targets; | |
/// | |
this() { | |
dataCallback = delegate void(const ubyte[]) {}; | |
limitCallback = delegate void() {}; | |
} | |
/// called when data becomes available | |
void delegate(const ubyte[]) dataCallback; | |
/// called when the remainingBytes == limit | |
void delegate() limitCallback; | |
ulong getRemainingBytes() { return remainingBytes; } | |
ulong getTotalBytesIn() { return totalBytesIn; } | |
ulong getTotalBytesOut() { return totalBytesOut; } | |
ulong getLimit() { return limit; } | |
/// override this method to have this stream apply transformations on | |
/// data as it passes through the stream | |
abstract ubyte[] transform(const ubyte[] data); | |
void setLimit(ulong limit) { | |
this.limit = limit; | |
this.remainingBytes = limit; | |
} | |
/// resets totalBytesIn and totalBytesOut | |
void resetStats() { | |
totalBytesIn = 0; | |
totalBytesOut = 0; | |
} | |
/// write data to this Stream | |
void write(const ubyte[] data) { | |
ubyte[] cursor = cast(ubyte[]) data; | |
if(cursor.length == 0) return; | |
while(true) { | |
if(limit > 0 && cursor.length >= remainingBytes) { | |
const ubyte[] deferredData = cursor[remainingBytes..cursor.length]; | |
_on_data(cursor[0..remainingBytes]); | |
remainingBytes = limit; | |
limitCallback(); | |
if(deferredData.length == 0) break; | |
cursor = cast(ubyte[]) deferredData; | |
} else { | |
_on_data(cursor); | |
remainingBytes -= cursor.length; | |
break; | |
} | |
} | |
} | |
/// called by write method with properly sized data chunks | |
protected void _on_data(const ubyte[] data) { | |
if(data.length == 0) return; | |
totalBytesIn += data.length; | |
sendOutputData(transform(data)); | |
} | |
/// adds output data manually without calling transform or | |
/// triggering input limits (useful for writing headers/footers | |
/// and used internally) | |
void sendOutputData(const ubyte[] data) { | |
if(data.length == 0) return; | |
totalBytesOut += data.length; | |
foreach(Writable target, bool value; targets) { | |
target.write(data); | |
} | |
dataCallback(data); | |
} | |
/// output from this stream will be sent to the specified stream | |
/// whenever data passes through this stream | |
void pipe(Writable stream) { | |
targets[stream] = true; | |
} | |
/// stop piping to the specified stream | |
void unpipe(Writable stream) { | |
targets.remove(stream); | |
} | |
} | |
/// represents a basic stream that performs no | |
/// transformations on the data passing it | |
class PassthroughStream : Stream { | |
override ubyte[] transform(const ubyte[] data) { | |
return cast(ubyte[]) data; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment