Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
function xstreamToPullStream<T>(stream: Stream<T>): Readable<T> {
let hasBufferVal: boolean;
let bufferVal: T;
let bufferEndOrErr: any;
let callback: Callback<T>;
const listener: Listener<T> = {
next: t => {
console.log('pull stream Listener for xstream', t);
if (callback) {
console.log('pull stream direct', t);
callback(null, t);
} else {
hasBufferVal = true;
bufferVal = t;
}
},
error: err => {
if (callback) {
callback(err);
} else {
bufferEndOrErr = err;
}
},
complete: () => {
if (callback) {
callback(true);
} else {
bufferEndOrErr = true;
}
},
};
stream.addListener(listener);
return (endOrErr: any, cb: Callback<T>) => {
callback = cb;
if (endOrErr) {
stream.removeListener(listener);
return cb(endOrErr);
}
if (bufferEndOrErr) {
return cb(bufferEndOrErr);
}
if (hasBufferVal) {
console.log('pull stream buffered', bufferVal);
hasBufferVal = false;
return cb(null, bufferVal);
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.