Skip to content

Instantly share code, notes, and snippets.

@dmorosinotto
Created February 13, 2023 09:30
Show Gist options
  • Save dmorosinotto/3443e66a09e3bcbf1b278a8bbf2ccd70 to your computer and use it in GitHub Desktop.
Save dmorosinotto/3443e66a09e3bcbf1b278a8bbf2ccd70 to your computer and use it in GitHub Desktop.
RxJS - fromReadableStream operator to trasform Readable Stream -> Observable
//ORIGINAL CODE INSPIRED BY Wassim Chegham https://twitter.com/manekinekko/status/1624440889216057347/photo/1
export function fromReadableStream(
stream: ReadableStream,
signal?: AbortSignal,
writableStrategy?: QueuingStrategy,
readableStrategy?: QueuingStrategy
): Observable<string> {
const createTextDecoderStream = ()=> new TextDecoderStream();
const transformer = () => new TransformStream(
{ transform(chunk, controller) { controller.enqueue(chunk); } }
, writableStrategy
, readableStrategy
);
const emitter = (subscriber: Subscriber<string>) => new WritableStream(
{ write(chunk) { subscriber.next(chunk); } }
, writableStrategy
);
return new Observable((subscriber) => {
stream
.pipeThrough(createTextDecoderStream())
.pipeThrough(transformer())
.pipeTo(emitter(subscriber), { signal }) //EMIT next(CHUNK) + HANDLE ABORT FROM CALLER
.then(() => !subscriber.closed && subscriber.complete()) //HANDLE FINISH -> complete
.catch((err) => subscriber.error(err)); //HANDLE STREAM ERRORS -> error
});
//TEARDONW LOGIC
return () => !stream.locked && stream.cancel();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment