Skip to content

Instantly share code, notes, and snippets.

@mmyoji
Last active October 1, 2021 08:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmyoji/45aca237bc725b800e785064339e2a07 to your computer and use it in GitHub Desktop.
Save mmyoji/45aca237bc725b800e785064339e2a07 to your computer and use it in GitHub Desktop.
[Blog][Node.js] stream.Readable
interface Post {
title: string;
}
interface FetchPostsArgs {
skip: number;
limit: number;
}
const BATCH_SIZE = 2;
const posts: Post[] = [
{ title: "foo" },
{ title: "bar" },
{ title: "buz" },
];
function fetchPosts({
skip,
limit,
}: FetchPostsArgs): Promise<Post[]> {
return Promise.resolve(posts.slice(skip, skip + limit));
}
import { Readable } from "stream";
// Add `read` option when initializing `Readable`.
function createPostsStream(batchSize: number = BATCH_SIZE): Readable {
const limit = batchSize;
return new Readable({
objectMode: true,
async read(_size: number) {
let skip = 0;
let data = await fetchPosts({ skip, limit });
while (data.length) {
this.push(data);
skip += limit;
data = await fetchPosts({ skip, limit });
}
this.push(null); // Notify the stream ends.
},
});
}
// or create a subclass of Readable with _read() method
import { Readable } from "stream";
// Read.from with (async) generator func.
async function* _createPostsStreamGenerator(limit: number) {
let skip = 0;
let data = await fetchPosts({ skip, limit });
while (data.length) {
yield data;
skip += limit;
data = await fetchPosts({ skip, limit });
}
}
function createPostsStream(batchSize: number = BATCH_SIZE): Readable {
return Readable.from(_createPostsStreamGenerator(batchSize));
}
import { Writable } from "stream";
import { finished, pipeline } from "stream/promises";
// Create Writable and piped
function createReceiverStream() {
return new Writable({
objectMode: true,
write(chunk: Post[], _encoding: BufferEncoding, callback: Function) {
console.log("type:", typeof chunk);
console.log("data:", chunk);
callback();
},
});
}
(async () => {
const src = createPostsStream();
const dest = createReceiverStream();
await finished(src.pipe(dest));
// or
// await pipeline(src, dest);
})()
.catch(console.error);
import { finished, pipeline } from "stream/promises";
(async () => {
const src = createPostsStrem();
// Subscribe events
src.on("data", (chunk) => {
console.log(typeof chunk); // object
console.log(chunk); // [{ title: "foo" }, ...]
});
src.on("end", () => {
console.log("Stream END");
});
src.on("error", (err) => {
console.error(err);
});
await finished(src);
})()
.catch(console.error);
(async () => {
// for await
for await (const chunk of createPostsStream()) {
console.log(chunk);
}
})()
.catch(console.error);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment