Skip to content

Instantly share code, notes, and snippets.

@shobhitg
Last active July 7, 2019 05:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shobhitg/ca3b5cb30279f91454835e155eab453d to your computer and use it in GitHub Desktop.
Save shobhitg/ca3b5cb30279f91454835e155eab453d to your computer and use it in GitHub Desktop.
RxJS fromStream Adapter
export default function fromStream<T = Uint8Array>(
stream: Readable,
finishEventName = "end",
dataEventName = "data"
) {
stream.pause();
const headerSize = 24;
const recordSize = 52;
let gotHeader: boolean = false;
let backStream: Buffer = Buffer.from([]);
return new Observable<Buffer>(observer => {
function dataHandler(data: Uint8Array) {
let size = gotHeader ? recordSize : headerSize;
backStream = Buffer.concat([backStream, data]);
while (backStream.length >= size) {
observer.next(backStream.subarray(0, size));
backStream = backStream.subarray(size);
if (!gotHeader) {
gotHeader = true;
size = recordSize;
}
}
}
function errorHandler(err: Error) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener("error", errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return () => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener("error", errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).pipe(share());
}
import { Observable } from "rxjs";
import { share } from "rxjs/operators";
import { Readable } from "stream";
// Borrowed this excellent function from https://stackoverflow.com/a/42174935
// This function fromStream() should have been provided by default in RxJS library,
// but for some reason its not included.
export default function fromStream<T = Uint8Array>(
stream: Readable,
finishEventName = "end",
dataEventName = "data"
) {
stream.pause();
return new Observable<number>(observer => {
function dataHandler(data: Uint8Array) {
// make sure we send data one byte at a time so its easier to handle buffering with byte counts
data.forEach(byte => observer.next(byte));
}
function errorHandler(err: Error) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener("error", errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return () => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener("error", errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).pipe(share());
}
import { Observable } from "rxjs";
import { share } from "rxjs/operators";
import { Readable } from "stream";
import { Buffer } from "buffer";
// Borrowed this excellent function from https://stackoverflow.com/a/42174935
// This function fromStream() should have been provided by default in RxJS library,
// but for some reason its not included.
export function fromStream<T = Uint8Array>(
stream: Readable,
finishEventName = "end",
dataEventName = "data"
) {
stream.pause();
return new Observable<Uint8Array>(observer => {
function dataHandler(data: Uint8Array) {
observer.next(data);
}
function errorHandler(err: Error) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener("error", errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return () => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener("error", errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).pipe(share());
}
export const toRecordStream = (
stream: Observable<Uint8Array>,
recordSize: number,
headerSize?: number
) => {
let backStream: Buffer = Buffer.of();
let gotHeader = headerSize == null ? true : false;
return new Observable<Buffer>(observer => {
stream.subscribe(data => {
let size = gotHeader || headerSize == null ? recordSize : headerSize;
backStream = Buffer.concat([backStream, data]);
while (backStream.length >= size) {
observer.next(backStream.subarray(0, size));
backStream = backStream.subarray(size);
if (!gotHeader) {
gotHeader = true;
size = recordSize;
}
}
});
}).pipe(share());
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment