Skip to content

Instantly share code, notes, and snippets.

@pr1sm
Last active May 8, 2019 19:25
Show Gist options
  • Save pr1sm/d854afc39d054c782bf5d98c0056a704 to your computer and use it in GitHub Desktop.
Save pr1sm/d854afc39d054c782bf5d98c0056a704 to your computer and use it in GitHub Desktop.
Multi Level Workers (Node.js)
/* eslint-disable import/no-unresolved */
const path = require('path');
const { Worker } = require('worker_threads');
async function start() {
const worker1 = new Worker(path.resolve(__dirname, './level1.js'));
worker1.on('message', payload => {
console.log('[Base] received from thread %d: %j', worker1.threadId, payload);
});
worker1.postMessage({
target: 'child',
event: '__start',
args: [[0, 1, 2, 3, 4]],
});
const processed = await new Promise(resolve => {
worker1.on('message', payload => {
if (payload.target === 'main' && payload.event === '__done') {
worker1.terminate();
resolve(payload.result);
}
});
});
console.log('[Base] finished processing with result: %d', processed);
}
start();
▶ node --experimental-worker .\base.js
[Level1] received numbers to process: [0,1,2,3,4]
[Level2] processing number: 0
[Level1] received payload from thread 2: {"__done":true,"val":4}
[Level2] processing number: 1
[Level1] received payload from thread 3: {"__done":false,"val":0}
[Level1] received payload from thread 3: {"__done":true,"val":5}
[Level2] processing number: 2
[Level1] received payload from thread 4: {"__done":false,"val":0}
[Level1] received payload from thread 4: {"__done":false,"val":1}
[Level1] pushing new data to process: 5
[Level1] received payload from thread 4: {"__done":true,"val":6}
[Level2] processing number: 3
[Level1] received payload from thread 5: {"__done":false,"val":0}
[Level1] received payload from thread 5: {"__done":false,"val":1}
[Level1] received payload from thread 5: {"__done":false,"val":2}
[Level1] received payload from thread 5: {"__done":true,"val":7}
[Level2] processing number: 4
[Level1] received payload from thread 6: {"__done":false,"val":0}
[Level1] received payload from thread 6: {"__done":false,"val":1}
[Level1] received payload from thread 6: {"__done":false,"val":2}
[Level1] received payload from thread 6: {"__done":false,"val":3}
[Level1] received payload from thread 6: {"__done":true,"val":8}
[Level2] processing number: 5
[Level1] received payload from thread 7: {"__done":false,"val":0}
[Level1] received payload from thread 7: {"__done":false,"val":1}
[Level1] received payload from thread 7: {"__done":false,"val":2}
[Level1] received payload from thread 7: {"__done":false,"val":3}
[Level1] received payload from thread 7: {"__done":false,"val":4}
[Level1] received payload from thread 7: {"__done":true,"val":9}
[Base] received from thread 1: {"target":"main","event":"__done","result":39}
[Base] finished processing with result: 39
/* eslint-disable import/no-unresolved */
/* eslint-disable no-await-in-loop */
const path = require('path');
const { parentPort, Worker } = require('worker_threads');
async function _start(nums) {
console.log('[Level1] received numbers to process: %j', nums);
const numsToProcess = [...nums];
const processedNums = [];
// Simulate new incoming data to process
setTimeout(() => {
console.log('[Level1] pushing new data to process: 5');
numsToProcess.push(5);
}, 1500);
while (numsToProcess.length !== 0) {
const num = numsToProcess.shift();
const worker2 = new Worker(path.resolve(__dirname, './level2.js'), {
workerData: { args: [num] },
});
worker2.on('message', payload => {
console.log('[Level1] received payload from thread %d: %j', worker2.threadId, payload);
});
const result = await new Promise(resolve => {
worker2.on('message', payload => {
if (payload.__done) {
worker2.terminate();
resolve(payload.val);
}
});
});
processedNums.push(result);
}
return processedNums.reduce((accum, val) => accum + val, 0);
}
async function start() {
parentPort.on('message', async payload => {
if (payload.target !== 'child' || payload.event !== '__start') {
return;
}
const nums = payload.args[0];
const processed = await _start(nums);
parentPort.postMessage({
target: 'main',
event: '__done',
result: processed,
});
});
}
start();
/* eslint-disable import/no-unresolved */
/* eslint-disable no-await-in-loop */
const { parentPort, workerData } = require('worker_threads');
async function process(num) {
console.log('[Level2] processing number: %d', num);
for (let i = 0; i < num; i += 1) {
parentPort.postMessage({
__done: false,
val: i,
});
await new Promise(resolve => setTimeout(resolve, 500));
}
return num + 4;
}
const { args } = workerData;
const num = args[0];
process(num).then(processed => {
parentPort.postMessage({
__done: true,
val: processed,
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment