Skip to content

Instantly share code, notes, and snippets.

@softwareperson
Last active November 3, 2022 06:14
Show Gist options
  • Save softwareperson/8eab1f94a519338eea812f21fab6a041 to your computer and use it in GitHub Desktop.
Save softwareperson/8eab1f94a519338eea812f21fab6a041 to your computer and use it in GitHub Desktop.
Merge Stream
import { Duplex, DuplexOptions, Readable, TransformCallback, Writable } from 'stream';
import { BufferList } from './internal/streams/BufferList';
declare type StreamCallback = (error?: Error | null) => void;
interface MergingState {
callback: StreamCallback;
highWaterMark: number;
size: number;
queue: BufferList;
finalizing: boolean;
}
function newMergingState(highWaterMark?: number): MergingState {
return {
// set a highWaterMark if it has not been set
highWaterMark: (highWaterMark ? highWaterMark : 0),
size: 0,
queue: new BufferList(),
callback: null,
finalizing: false,
};
}
export interface MergingStream { index: number; stream: Readable; }
export interface MergingOptions extends DuplexOptions {
nextStream?: () => IterableIterator<MergingStream>;
}
export class Merge extends Duplex {
// constants
public static readonly START = 0;
public static readonly END = -1;
// keep track of merging streams
private streamGenerator: IterableIterator<MergingStream>;
private mergingStream: MergingStream;
private index: number;
// sink for the merging readable
private mergeWriteStream: Writable;
// states
private mergingState: MergingState;
private writingState: MergingState;
private mergeSync: boolean;
private finished: boolean;
public constructor(opts?: MergingOptions) {
super(opts);
this.index = Merge.START;
// set a mergingState so it is not null
// don't think it matters, but yeah...
this.mergingState = newMergingState();
this.writingState = newMergingState(this.writableHighWaterMark);
if (opts && typeof opts.nextStream === 'function') {
this.streamGenerator = opts.nextStream();
} else {
this.streamGenerator = this._nextStream();
}
// capture pipes, such that we can attach our own end listener
this.on('pipe', this.onPipe);
}
// -----------------------------------------------------------------------------
// Overrides
// -----------------------------------------------------------------------------
public _write(chunk: Buffer, encoding: string, cb: TransformCallback): void {
this.writeState(this, chunk, encoding, cb, this.writingState);
}
public _read(size: number): void {
if (this.finished) {
// we've finished, there is nothing to left to read
return;
}
this.mergeSync = false;
let bytesRead = 0;
do {
const availableSpace = this.readableHighWaterMark - this.readableLength;
bytesRead = 0;
READ_LOOP: while (bytesRead < availableSpace && !this.finished) {
try {
const result = this.doRead(availableSpace - bytesRead);
if (result === 0) {
// either there is nothing in our buffers
// or our states are outdated (since they get updated in doRead)
break READ_LOOP;
}
bytesRead += result;
} catch (error) {
this.emit('error', error);
this.push(null);
this.finished = true;
}
}
} while (bytesRead > 0 && !this.finished);
this.handleFinished();
}
/**
* _nextStream generator method
*/
protected * _nextStream(): IterableIterator<MergingStream> {
throw new Error('Please override _nextStream or provide nextStream as option');
}
// -----------------------------------------------------------------------------
// MISC
// -----------------------------------------------------------------------------
private onPipe(readable: Readable): void {
// prevent our stream from being closed prematurely and unpipe it instead
readable.removeAllListeners('end'); // Note: will cause issues if another end listener is set
readable.once('end', () => {
this.finalizeState(this.writingState);
readable.unpipe();
});
}
private mergeNextTick(): void {
if (!this.mergeSync) {
// make sure it is only called once per tick
// we don't want to call it multiple times
// since there will be nothing left to read the second time
this.mergeSync = true;
process.nextTick(() => this._read(this.readableHighWaterMark));
}
}
private onMergeEnd(readable: Readable, cb: StreamCallback): void {
if (this.mergingStream.stream !== readable) {
throw new Error('onMergeEnd callback from wrong incorrect readable');
}
this.finalizeState(this.mergingState, cb);
}
private handleFinished(): void {
if (this.finished) {
// merge stream has finished, so nothing to check
return;
}
if (this.isStateFinished(this.mergingState)) {
// our mergingStream has finished, so callback our callback
// this should be our final callback
this.stateCallbackAndSet(this.mergingState, null);
// set our mergingStream to null, to indicate we need a new one
// which will be fetched by getNextMergingIndex()
this.mergingStream = null;
this.mergeNextTick();
}
if (this.isStateFinished(this.writingState)) {
// our writingStream has finished, so callback our callback
// this should be our final callback
this.stateCallbackAndSet(this.writingState, null);
if (this.mergingStream == null) {
// Check one last time for our next merging Stream, just in case
this.setNewMergeStream(this.streamGenerator.next().value);
}
// get our nextMainStream()
this.nextMainStream();
this.mergeNextTick();
}
}
// -----------------------------------------------------------------------------
// READING
// -----------------------------------------------------------------------------
/**
* Method to read from the correct Queue
*
* The doRead method is called multiple times by the _read method until
* it is satisfied with the returned size, or until no more bytes can be read
*
* @param n the number of bytes that can be read until highWaterMark is hit
* @throws Errors when something goes wrong, so wrap this method in a try catch.
* @returns the number of bytes read from either buffer
*/
private doRead(n: number): number {
// first check all constants below 0,
// which is only Merge.END right now
const nextMergingIndex = this.getNextMergingIndex();
if (nextMergingIndex === Merge.END) {
// read writing state until the end
return this.readWritingState(n);
}
const bytesToNextIndex = nextMergingIndex - this.index;
if (bytesToNextIndex === 0) {
// We are at the merging index, thus should read merging queue
return this.readState(n, this.mergingState);
}
if (n <= bytesToNextIndex) {
// We are safe to read n bytes
return this.readWritingState(n);
}
// read the bytes until the next merging index
return this.readWritingState(bytesToNextIndex);
}
/**
* Method to read from the writing state
*
* @param n maximum number of bytes to be read
* @returns number of bytes written.
*/
private readWritingState(n: number): number {
const bytesWritten = this.readState(n, this.writingState);
this.index += bytesWritten;
return bytesWritten;
}
// -----------------------------------------------------------------------------
// MAIN STREAM
// -----------------------------------------------------------------------------
private nextMainStream(): void {
if (!this.mergingStream) {
// We are done merging, so we are finished
this.finished = true;
this.push(null);
return;
}
if (this.index === this.mergingStream.index) {
// We still have a mergingStream at our current index
// So lets merge that one first
this.mergeNextTick();
return;
}
if (this.mergingStream.index === Merge.END) {
this.index = Merge.START;
this.writingState = this.mergingState;
this.mergingStream.stream.pipe(this);
this.mergingStream = null;
} else {
throw new Error('Corrupt state, contact your local government');
}
}
// -----------------------------------------------------------------------------
// MERGING STREAM
// -----------------------------------------------------------------------------
/**
* Method to get the next merging index.
*
* Also fetches the next merging stream if merging stream is null
*
* @returns the next merging index, or Merge.END if there is no new mergingStream
* @throws Error when invalid MergingStream is returned by streamGenerator
*/
private getNextMergingIndex(): number {
if (!this.mergingStream) {
this.setNewMergeStream(this.streamGenerator.next().value);
if (!this.mergingStream) {
return Merge.END;
}
}
return this.mergingStream.index;
}
/**
* Method to set the new merging stream
*
* @throws Error when mergingStream has an index less than the current index
*/
private setNewMergeStream(mergingStream?: MergingStream): void {
if (this.mergingStream) {
throw new Error('There already is a merging stream');
}
// Set a new merging stream
this.mergingStream = mergingStream;
if (mergingStream == null || mergingStream.index === Merge.END) {
// set new state
this.mergingState = newMergingState(this.writableHighWaterMark);
// We're done, for now...
// mergingStream will be handled further once nextMainStream() is called
return;
}
if (mergingStream.index < this.index) {
throw new Error('Cannot merge at ' + mergingStream.index + ' because current index is ' + this.index);
}
// Create a new writable our new mergingStream can write to
this.mergeWriteStream = new Writable({
// Create a write callback for our new mergingStream
write: (chunk, encoding, cb) => this.writeMerge(mergingStream.stream, chunk, encoding, cb),
final: (cb: StreamCallback) => {
this.onMergeEnd(mergingStream.stream, cb);
},
});
// Create a new mergingState for our new merging stream
this.mergingState = newMergingState(this.mergeWriteStream.writableHighWaterMark);
// Pipe our new merging stream to our sink
mergingStream.stream.pipe(this.mergeWriteStream);
}
private writeMerge(readable: Readable, chunk: Buffer, encoding: string, cb: StreamCallback): void {
if (this.mergingStream.stream !== readable) {
// the readable is in an invalid state, should have ended
readable.unpipe();
cb(Error('cannot write after stream has ended'));
return;
}
this.writeState(readable, chunk, encoding, cb, this.mergingState);
}
// -----------------------------------------------------------------------------
// STATE: READ, WRITE, END, CALLBACK (some could be static, but who cares)
// -----------------------------------------------------------------------------
/**
* Method to read the provided state if it can
*
* @param size the number of bytes to consume
* @param state the state from which needs to be read
* @returns the amount of bytes read
*/
private readState(size: number, state: MergingState): number {
if (state.size === 0) {
// our queue is empty so we read 0 bytes
return 0;
}
let buffer = null;
if (state.size < size) {
buffer = state.queue.consume(state.size, false);
} else {
buffer = state.queue.consume(size, false);
}
this.push(buffer);
this.stateCallbackAndSet(state, null);
state.size -= buffer.length;
return buffer.length;
}
/**
* Method to write to provided state if it can
*
* (Will unshift the bytes that cannot be written back to the source)
*
* @param src the readable source that writes the chunk
* @param chunk the chunk to be written
* @param encoding the chunk encoding, currently not used
* @param cb the streamCallback provided by the writing state
* @param state the state which should be written to
*/
private writeState(src: Readable, chunk: Buffer, encoding: string, cb: StreamCallback, state: MergingState): void {
this.mergeNextTick();
const bytesAvailable = state.highWaterMark - state.size;
if (chunk.length <= bytesAvailable) {
// save to write to our local buffer
state.queue.push(chunk);
state.size += chunk.length;
if (chunk.length === bytesAvailable) {
// our queue is full, so store our callback
this.stateCallbackAndSet(state, cb);
} else {
// we still have some space, so we can call the callback immediately
cb();
}
return;
}
if (bytesAvailable === 0) {
// no space available unshift entire chunk
src.unshift(chunk);
} else {
state.size += bytesAvailable;
const leftOver = Buffer.alloc(chunk.length - bytesAvailable);
chunk.copy(leftOver, 0, bytesAvailable);
// push amount of bytes available
state.queue.push(chunk.slice(0, bytesAvailable));
// unshift what we cannot fit in our queue
src.unshift(leftOver);
}
this.stateCallbackAndSet(state, cb);
}
/**
* Method to check if a specific state has completed
* @param state the state to check
* @returns true if the state has completed
*/
private isStateFinished(state: MergingState): boolean {
if (!state || !state.finalizing || state.size > 0) {
return false;
}
return true;
}
/**
* Method to put a state in finalizing mode
*
* Finalizing mode: the last chunk has been received, when size is 0
* the stream should be removed.
*
* @param state the state which should be put in finalizing mode
*
*/
private finalizeState(state: MergingState, cb?: StreamCallback): void {
state.finalizing = true;
this.stateCallbackAndSet(state, cb);
this.mergeNextTick();
}
/**
* Helper function to call the callback if it exists and set the new callback
* @param state the state which holds the callback
* @param cb the new callback to be set
*/
private stateCallbackAndSet(state: MergingState, cb: StreamCallback): void {
if (!state) {
return;
}
if (state.callback) {
const callback = state.callback;
// do callback next tick, such that we can't get stuck in a writing loop
process.nextTick(() => callback());
}
state.callback = cb;
}
}
interface Entry {
data: any;
next: Entry;
}
export class BufferList {
public length: number;
private head: Entry;
private tail: Entry;
public push(v: any): void {
const entry: Entry = {data: v, next: null};
if (this.length > 0) {
this.tail.next = entry;
} else {
this.head = entry;
}
this.tail = entry;
++this.length;
}
public unshift(v: any): void {
const entry: Entry = { data: v, next: this.head };
if (this.length === 0) {
this.tail = entry;
}
this.head = entry;
++this.length;
}
public shift(): any {
if (this.length === 0) {
return;
}
const ret = this.head.data;
if (this.length === 1) {
this.head = this.tail = null;
} else {
this.head = this.head.next;
}
--this.length;
return ret;
}
public clear(): void {
this.head = this.tail = null;
this.length = 0;
}
public join(s: string): string {
if (this.length === 0) {
return '';
}
let p = this.head;
let ret = '' + p.data;
// tslint:disable-next-line:no-conditional-assignment
while (p = p.next) {
ret += s + p.data;
}
return ret;
}
public concat(n: number): Buffer {
if (this.length === 0) {
return Buffer.alloc(0);
}
// tslint:disable-next-line:no-bitwise
const ret = Buffer.allocUnsafe(n >>> 0);
let p = this.head;
let i = 0;
while (p) {
p.data.copy(ret, i);
i += p.data.length;
p = p.next;
}
return ret;
}
// Consumes a specified amount of bytes or characters from the buffered data.
public consume(n: number, hasStrings: boolean): Buffer | string {
let ret;
if (n < this.head.data.length) {
// `slice` is the same for buffers and strings.
ret = this.head.data.slice(0, n);
this.head.data = this.head.data.slice(n);
} else if (n === this.head.data.length) {
// First chunk is a perfect match.
ret = this.shift();
} else {
// Result spans more than one buffer.
ret = hasStrings ? this._getString(n) : this._getBuffer(n);
}
return ret;
}
public first(): any {
return this.head.data;
}
// Consumes a specified amount of characters from the buffered data.
private _getString(n: number): string {
let p = this.head;
let c = 1;
let ret = p.data;
n -= ret.length;
// tslint:disable-next-line:no-conditional-assignment
while (p = p.next) {
const str = p.data;
const nb = (n > str.length ? str.length : n);
if (nb === str.length) {
ret += str;
} else {
ret += str.slice(0, n);
}
n -= nb;
if (n === 0) {
if (nb === str.length) {
++c;
if (p.next) {
this.head = p.next;
} else {
this.head = this.tail = null;
}
} else {
this.head = p;
p.data = str.slice(nb);
}
break;
}
++c;
}
this.length -= c;
return ret;
}
// Consumes a specified amount of bytes from the buffered data.
private _getBuffer(n: number): Buffer {
const ret = Buffer.allocUnsafe(n);
let p = this.head;
let c = 1;
p.data.copy(ret);
n -= p.data.length;
// tslint:disable-next-line:no-conditional-assignment
while (p = p.next) {
const buf = p.data;
const nb = (n > buf.length ? buf.length : n);
buf.copy(ret, ret.length - n, 0, nb);
n -= nb;
if (n === 0) {
if (nb === buf.length) {
++c;
if (p.next) {
this.head = p.next;
} else {
this.head = this.tail = null;
}
} else {
this.head = p;
p.data = buf.slice(nb);
}
break;
}
++c;
}
this.length -= c;
return ret;
}
}
@softwareperson
Copy link
Author

Included typescript implementation of Nodejs internal bufferlist for completeness, all rights of the BufferList are reserved by Nodejs project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment