Skip to content

Instantly share code, notes, and snippets.

@dmsnell
Created August 29, 2020 20:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dmsnell/c0ccf00737c8611f4e8807be601b9d6c to your computer and use it in GitHub Desktop.
Save dmsnell/c0ccf00737c8611f4e8807be601b9d6c to your computer and use it in GitHub Desktop.
50k `async` JS processes
export const counter = async ({send, receive}) => {
let count = 0;
while (1) {
const [action, args] = await receive();
switch (action) {
case 'inc':
count++;
break;
case 'dec':
count--;
break;
case 'add':
count += args;
break;
case 'report':
send(args, count);
break;
}
}
}
const { Worker } = require('worker_threads');
const spawnWorker = id => new Promise( resolve => {
const worker = new Worker('./worker-loader.js', { workerData: { id } } );
worker.on('message', async ({sender, spawner}) => {
let requestCount = 0;
const spawn = (path, name, args) => new Promise( resolve => {
const requestId = requestCount++;
spawner.postMessage([requestId, path, name, args]);
const responder = ({messageId, pid}) => {
if ( messageId !== requestId ) {
return;
}
spawner.removeListener('message', responder);
resolve(pid)
};
spawner.addListener('message', responder);
} );
const send = (pid, message) => sender.postMessage([pid, message]);
const on = f => sender.on('message', f);
resolve({send, spawn, on});
});
worker.postMessage(null);
} );
const go = async () => {
const ap = spawnWorker('a');
const bp = spawnWorker('b');
const a = await ap;
const b = await bp;
const counter1p = a.spawn('./counter.mjs', 'counter', []);
const counter2p = a.spawn('./counter.mjs', 'counter', []);
const counter3p = b.spawn('./counter.mjs', 'counter', []);
const counter = await counter1p;
const counter2 = await counter2p;
const counter3 = await counter3p;
a.send(counter, ['inc']);
b.send(counter3, ['add', 4813]);
b.send(counter3, ['report', 33]);
a.send(counter2, ['dec']);
a.send(counter, ['inc']);
a.send(counter, ['report', 15]);
a.send(counter, ['add', 238]);
a.send(counter2, ['report', 83]);
a.send(counter, ['report', 24]);
};
// go();
const range = n => new Array(n).fill(null).map((v, i) => i);
const go2 = async () => {
const COUNT = 50000;
const THREADS = 16;
const workers = await Promise.all(range(THREADS).map(i => spawnWorker(`worker-${ i }`)));
let total = 0;
let count = 0;
workers.forEach( async ( worker, i ) => {
worker.on( ([pid, message]) => {
total += message;
count++;
if (count >= COUNT) {
console.log({total});
process.exit(0);
}
} );
for (let j = 0; j < COUNT / THREADS; j++) {
const counter = await worker.spawn('./counter.mjs', 'counter', []);
worker.send(counter, ['add', 1 ] );
worker.send(counter, ['report', i ] );
}
} );
}
go2();
import { MessageChannel, MessagePort, Worker } from 'worker_threads';
import type { Pid } from './fidget';
type ReceiveFunction = () => Promise<unknown>
type ProcessFunction = (args: {
receive: ReceiveFunction;
send: (pid: Pid, message: unknown) => void;
self: Pid;
}) => void;
type Process = {
port: MessagePort;
queue: unknown[]
};
const process_table = new Map<Pid, Process>();
let pid_counter = 0;
const next_pid = (): Pid => ++pid_counter;
export function spawn(runner: ProcessFunction, sender): Pid {
const pid = next_pid();
const { port1, port2 } = new MessageChannel();
const process: Process = {
port: port1,
queue: []
}
const receive: ReceiveFunction = function() {
const selectors = arguments;
const timeout = selectors.length > 0 && Number.isFinite(selectors[selectors.length - 1])
? selectors[selectors.length - 1]
: null;
return new Promise( resolve => {
let listener: () => void;
const timer = timeout && setTimeout( () => {
port2.removeListener('message', listener);
resolve( selectors.length > 1 ? [null, 'timeout'] : 'timeout');
}, timeout );
listener = () => {
if (process.queue.length <= 0) {
port2.once('message', listener);
return;
}
const message = process.queue[0];
// with no selectors we can return the message itself
if (selectors.length === 0 || (timeout !== null && selectors.length === 1 ) ) {
process.queue.shift();
clearTimeout(timer);
resolve(message);
return;
}
// otherwise we have to check if the message is selected
for (let q = 0; q < process.queue.length; q++) {
const message = process.queue[q];
for (let i = 0; i < selectors.length && 'function' === typeof selectors[i]; i++) {
const selector = selectors[i];
if (selector(message)) {
process.queue.splice(q, 1);
clearTimeout(timer);
resolve([i, message]);
return;
}
}
}
// if we got here then we had no selector match
// so we'll have to wait for the next one
port2.once('message', listener);
}
listener();
} );
}
process_table.set( pid, process );
setTimeout( () => {
try {
runner( { receive, send: sender ?? send, self: pid } );
} catch ( e ) {
// pass
}
}, 0 );
return pid;
}
export function send(pid: Pid, message: unknown) {
const process = process_table.get(pid);
if (!process) {
return false;
}
process.queue.push(message);
process.port.postMessage(null);
}
const { MessageChannel, parentPort, workerData } = require('worker_threads')
const { send: localSend, spawn } = require('./process.js');
const { id } = workerData;
parentPort.once('message', () => {
const sender = new MessageChannel();
const spawner = new MessageChannel();
const send = (pid, message) => {
sender.port1.postMessage([pid, message]);
}
sender.port1.on( 'message', data => {
const [pid, message] = data;
localSend(pid, message);
} );
spawner.port1.on( 'message', message => {
const [ messageId, modulePath, functionName, args ] = message;
import( modulePath ).then( async moduleObject => {
const pid = spawn( moduleObject[ functionName ], send );
spawner.port1.postMessage({messageId, pid});
} );
} );
parentPort.postMessage({
sender: sender.port2,
spawner: spawner.port2
}, [ sender.port2, spawner.port2 ]);
} );
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment