Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Created November 29, 2021 10:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cowboyd/5c4b6172222d97442b0fc6698bdb5d4e to your computer and use it in GitHub Desktop.
Save cowboyd/5c4b6172222d97442b0fc6698bdb5d4e to your computer and use it in GitHub Desktop.
Effection Workers
const { once, spawn, main, on, createChannel } = require('effection');
const { Worker } = require('worker_threads');
main(function* () {
const worker1 = yield createWorker('./worker1.js');
const worker2 = yield createWorker('./worker2.js');
yield spawn(function*() {
const interval = setInterval(() => {
worker2.worker.postMessage({ t: 'getValue' }); // ticker
}, 1000);
try {
yield;
} finally {
clearInterval(interval);
}
})
yield worker2.messages.match({ t: 'getValue', x: 5 }).expect(); // wait x = 5
worker2.worker.postMessage({ t: 'stop' });
console.log('getValueDone');
});
function createWorker(path) {
return {
labels: {
name: 'Worker',
path,
},
*init() {
let worker = new Worker(path);
// propagate errors to the main thread.
yield spawn(function*() {
let error = yield once(worker, 'error');
throw error;
})
// ensure this worker is terminated
yield spawn(function*() {
try {
let status = yield once(worker, 'exit');
console.log(`worker ${path} exited with status: ${status}`);
} finally {
yield worker.terminate();
}
})
let messages = on(worker, 'message'); // Here is "On" function is working properly
yield messages.match({ t: 'start' }).expect();
console.log('worker', path, 'is started');
return {
worker,
messages
};
}
}
}
const { on, main, spawn } = require('effection');
const { parentPort } = require('worker_threads');
main(function*() {
yield spawn(on(parentPort, 'message').forEach(function*() {
console.log(message, 'worker');
}));
parentPort.postMessage({ t: 'start' });
})
const { parentPort } = require('worker_threads');
const { main, on } = require('effection');
let x = 1;
main(function* () {
parentPort.postMessage({ t: 'start' });
yield on(parentPort, 'message').forEach(function*(value) {
console.log('got value:', value.data);
switch (value.data.t) {
case 'getValue':
parentPort.postMessage({ t: 'getValue', x: x++ }) // increment x
break;
case 'stop':
console.log('stop')
yield;
return;
default:
break;
}
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment