Skip to content

Instantly share code, notes, and snippets.

@sam0x17

sam0x17/stream.d Secret

Created March 9, 2019 16:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sam0x17/561de58e69dbe979ba4b1c5042b6c153 to your computer and use it in GitHub Desktop.
Save sam0x17/561de58e69dbe979ba4b1c5042b6c153 to your computer and use it in GitHub Desktop.
D implementation
module streams;
/// represents an object or stream that can be written to
/// via a write method, as used by our Stream class
interface Writable {
/// writes data to the underlying stream or interface
void write(const ubyte[] data);
}
/// Encapsulates a stream of data. Can act as an input, an output, or both
/// depending on what methods are defined.
abstract class Stream : Writable {
protected ulong remainingBytes = 0;
protected ulong totalBytesIn = 0;
protected ulong totalBytesOut = 0;
protected ulong limit = 0;
protected bool[Writable] targets;
///
this() {
dataCallback = delegate void(const ubyte[]) {};
limitCallback = delegate void() {};
}
/// called when data becomes available
void delegate(const ubyte[]) dataCallback;
/// called when the remainingBytes == limit
void delegate() limitCallback;
ulong getRemainingBytes() { return remainingBytes; }
ulong getTotalBytesIn() { return totalBytesIn; }
ulong getTotalBytesOut() { return totalBytesOut; }
ulong getLimit() { return limit; }
/// override this method to have this stream apply transformations on
/// data as it passes through the stream
abstract ubyte[] transform(const ubyte[] data);
void setLimit(ulong limit) {
this.limit = limit;
this.remainingBytes = limit;
}
/// resets totalBytesIn and totalBytesOut
void resetStats() {
totalBytesIn = 0;
totalBytesOut = 0;
}
/// write data to this Stream
void write(const ubyte[] data) {
ubyte[] cursor = cast(ubyte[]) data;
if(cursor.length == 0) return;
while(true) {
if(limit > 0 && cursor.length >= remainingBytes) {
const ubyte[] deferredData = cursor[remainingBytes..cursor.length];
_on_data(cursor[0..remainingBytes]);
remainingBytes = limit;
limitCallback();
if(deferredData.length == 0) break;
cursor = cast(ubyte[]) deferredData;
} else {
_on_data(cursor);
remainingBytes -= cursor.length;
break;
}
}
}
/// called by write method with properly sized data chunks
protected void _on_data(const ubyte[] data) {
if(data.length == 0) return;
totalBytesIn += data.length;
sendOutputData(transform(data));
}
/// adds output data manually without calling transform or
/// triggering input limits (useful for writing headers/footers
/// and used internally)
void sendOutputData(const ubyte[] data) {
if(data.length == 0) return;
totalBytesOut += data.length;
foreach(Writable target, bool value; targets) {
target.write(data);
}
dataCallback(data);
}
/// output from this stream will be sent to the specified stream
/// whenever data passes through this stream
void pipe(Writable stream) {
targets[stream] = true;
}
/// stop piping to the specified stream
void unpipe(Writable stream) {
targets.remove(stream);
}
}
/// represents a basic stream that performs no
/// transformations on the data passing it
class PassthroughStream : Stream {
override ubyte[] transform(const ubyte[] data) {
return cast(ubyte[]) data;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment