Skip to content

Instantly share code, notes, and snippets.

@dmsnell
Last active August 27, 2020 21:56
Show Gist options
  • Save dmsnell/a4cebf052a110d49a02719ca2b63006e to your computer and use it in GitHub Desktop.
Save dmsnell/a4cebf052a110d49a02719ca2b63006e to your computer and use it in GitHub Desktop.
Process Primitives
const { spawn, send } = require('./process.js')
const counter = spawn( async (receive) => {
let count = 0;
while (1) {
const message = await receive();
if (message === 'inc') {
console.log(++count);
} else if ( message === 'dec' ) {
console.log(--count);
} else if ( Array.isArray(message) && message[0] === 'add' ) {
count += message[1];
console.log(count);
}
}
} )
send(counter, 'inc');
send(counter, 'inc');
send(counter, 'inc');
send(counter, 'dec');
send(counter, ['add', 1248]);
"use strict";
exports.__esModule = true;
exports.send = exports.spawn = void 0;
var worker_threads_1 = require("worker_threads");
var process_table = new Map();
var pid_counter = 0;
var next_pid = function () { return ++pid_counter; };
exports.spawn = function (runner) {
var pid = next_pid();
var _a = new worker_threads_1.MessageChannel(), port1 = _a.port1, port2 = _a.port2;
var process = {
port: port1,
queue: []
};
var receive = function () {
var selectors = arguments;
var timeout = selectors.length > 0 && Number.isFinite(selectors[selectors.length - 1])
? selectors[selectors.length - 1]
: null;
return new Promise(function (resolve) {
var listener;
var timer = timeout && setTimeout(function () {
port2.removeListener('message', listener);
resolve(selectors.length > 1 ? [null, 'timeout'] : 'timeout');
}, timeout);
listener = function () {
if (process.queue.length <= 0) {
port2.once('message', listener);
return;
}
var message = process.queue[process.queue.length - 1];
// with no selectors we can return the message itself
if (selectors.length === 0 || (timeout !== null && selectors.length === 1)) {
process.queue.pop();
clearTimeout(timer);
resolve(message);
return;
}
// otherwise we have to check if the message is selected
for (var q = process.queue.length - 1; q >= 0; q--) {
var message_1 = process.queue[q];
for (var i = 0; i < selectors.length && 'function' === typeof selectors[i]; i++) {
var selector = selectors[i];
if (selector(message_1)) {
process.queue.splice(q, 1);
clearTimeout(timer);
resolve([i, message_1]);
return;
}
}
}
// if we got here then we had no selector match
// so we'll have to wait for the next one
port2.once('message', listener);
};
listener();
});
};
process_table.set(pid, process);
setTimeout(function () {
try {
runner(receive);
}
catch (e) {
// pass
}
}, 0);
return pid;
};
exports.send = function (pid, message) {
var process = process_table.get(pid);
if (!process) {
return false;
}
process.queue.push(message);
process.port.postMessage(null);
};
import { MessageChannel, MessagePort } from 'worker_threads';
import type { Pid } from './fidget';
type ReceiveFunction = () => Promise<unknown>
type ProcessFunction = (receive: ReceiveFunction) => void;
type Process = {
port: MessagePort;
queue: unknown[]
};
const process_table = new Map<Pid, Process>();
let pid_counter = 0;
const next_pid = (): Pid => ++pid_counter;
export const spawn = (runner: ProcessFunction): Pid => {
const pid = next_pid();
const { port1, port2 } = new MessageChannel();
const process: Process = {
port: port1,
queue: []
}
const receive: ReceiveFunction = function() {
const selectors = arguments;
const timeout = selectors.length > 0 && Number.isFinite(selectors[selectors.length - 1])
? selectors[selectors.length - 1]
: null;
return new Promise( resolve => {
let listener: () => void;
const timer = timeout && setTimeout( () => {
port2.removeListener('message', listener);
resolve( selectors.length > 1 ? [null, 'timeout'] : 'timeout');
}, timeout );
listener = () => {
if (process.queue.length <= 0) {
port2.once('message', listener);
return;
}
const message = process.queue[process.queue.length - 1];
// with no selectors we can return the message itself
if (selectors.length === 0 || (timeout !== null && selectors.length === 1 ) ) {
process.queue.pop();
clearTimeout(timer);
resolve(message);
return;
}
// otherwise we have to check if the message is selected
for (let q = process.queue.length - 1; q >= 0; q--) {
const message = process.queue[q];
for (let i = 0; i < selectors.length && 'function' === typeof selectors[i]; i++) {
const selector = selectors[i];
if (selector(message)) {
process.queue.splice(q, 1);
clearTimeout(timer);
resolve([i, message]);
return;
}
}
}
// if we got here then we had no selector match
// so we'll have to wait for the next one
port2.once('message', listener);
}
listener();
} );
}
process_table.set( pid, process );
setTimeout( () => {
try {
runner( receive );
} catch ( e ) {
// pass
}
}, 0 );
return pid;
}
export const send = (pid: Pid, message: unknown) => {
const process = process_table.get(pid);
if (!process) {
return false;
}
process.queue.push(message);
process.port.postMessage(null);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment