Skip to content

Instantly share code, notes, and snippets.

@bellbind
Last active August 21, 2022 09:32
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bellbind/76949e3ce7d73fda8cdd74e1306f1d92 to your computer and use it in GitHub Desktop.
Save bellbind/76949e3ce7d73fda8cdd74e1306f1d92 to your computer and use it in GitHub Desktop.
[nodejs][worker_threads][es2018] req/res to Worker with for-await loop of ES2018
// node.js worker_threads module example with for-await loop: main thread side
//$ node --exprimental-worker main.js
const path = require("path");
const {Worker, MessageChannel} = require("worker_threads");
// async queue as "asyncIterator" in ES2018
class Queue {
constructor() {
this.polls = [];
this.gets = [];
}
[Symbol.asyncIterator]() {return this;}
next() {
return new Promise(res => {
if (this.polls.length > 0) this.polls.shift()(res);
else this.gets.push(res);
});
}
poll() {
return new Promise(res => {
if (this.gets.length > 0) res(this.gets.shift());
else this.polls.push(res);
});
}
// enqueue as iterator next objects
post(value) {
return this.poll().then(get => get({value, done: false}));
}
close() {
return this.poll().then(get => get({done: true}));
}
}
async function* send(worker, data) {
const mc = new MessageChannel();
// port1 as asyncIterator queue
const queue = new Queue();
mc.port1.on("message", ({data}) => queue.post(data));
mc.port1.on("close", () => queue.close());
const ports = [mc.port2];
worker.postMessage({data, ports}, ports);
yield* queue;
//for async (const v of queue) yield v; // same as yield* in async gen
}
// main
(async function main() {
const w = new Worker(path.resolve(__dirname, "./worker.js"));
for await (const chunk of send(w, "Hello ES2018 World")) {
console.log(chunk);
}
w.terminate();
})().catch(console.error);
// node.js worker_threads example: worker side code
// NOTE: in main, parentPort is null
const {parentPort} = require("worker_threads");
// As DOM MessageEvent style parameters
parentPort.on("message", ({data, ports}) => {
// echo response also as a MessageEvent like
for (const chunk of data.split(/\s+/)) {
ports[0].postMessage({data: chunk, ports: []});
}
ports[0].close();
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment