Skip to content

Instantly share code, notes, and snippets.

@alexreardon
Last active December 2, 2023 12:56
Show Gist options
  • Save alexreardon/0a71e91891c02cc13d31c7a5c4ef8c75 to your computer and use it in GitHub Desktop.
Save alexreardon/0a71e91891c02cc13d31c7a5c4ef8c75 to your computer and use it in GitHub Desktop.
rxjs file stream
// @flow
import fs from 'fs';
import { Observable } from 'rxjs';
type Options = {
highWaterMark: number,
encoding: string,
}
const defaultOptions: Options = {
// number of bits to read at a time
highWaterMark: 1,
encoding: 'utf8',
};
export default (path: string, options?: Options = defaultOptions) =>
Observable.create((observer) => {
const file$ = fs.createReadStream(path, options);
file$.on('data', (chunk) => observer.next(chunk));
file$.on('end', () => observer.complete());
file$.on('close', () => observer.complete());
file$.on('error', (error) => observer.error(error));
// there seems to be no way to actually close the stream
return () => file$.pause();
});
@alexreardon
Copy link
Author

Super simple and tailored for my specific use case. Not super feature rich - but I thought others might be interested anyway

@Scarysize
Copy link

Nice! If you create the read stream using a file descriptor instead of a path, you can call fs.close(fd). This closes the opened file descriptor and in turn closes the read stream (which will emit a close event).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment