Skip to content

Instantly share code, notes, and snippets.

@muromec
Last active February 12, 2023 20:01
Show Gist options
  • Save muromec/e09dc075ad7446057e90e71215ac4445 to your computer and use it in GitHub Desktop.
Save muromec/e09dc075ad7446057e90e71215ac4445 to your computer and use it in GitHub Desktop.
function spawn(fn, pname, toParent = ()=> null) {
let current = null;
let state = null;
function getState() {
return state;
};
let listeners = [];
let buffer = [];
function subscribe(fn) {
listeners.push(fn);
}
function send ({ to = pparent, ...msg }) {
let ret = to.next({ state, reply: toSelf, msg });
if (!ret.done) {
state = ret.value;
}
tick();
}
function tick () {
listeners.forEach(fn => fn(state));
let msg;
while(msg = buffer.pop()) {
toParent({pname, ...msg});
}
}
function toSelf(msg) {
send({ to: current, ...msg});
}
function fromChild(msg) {
if(msg.type === 'HI') {
return msg.subscribe(tick);
}
toSelf(msg);
}
function toBuffer(msg) {
buffer.push(msg);
}
function fork (fn, pname) {
return spawn(fn, pname, fromChild);
}
return (...args) => {
const process = {pname, send: toSelf, getState, subscribe, fork, toParent: toBuffer};
const task = watchExit(fn)(process, ...args);
current = task;
task.next();
toSelf({ type: 'INIT' });
toParent({ type: 'HI', subscribe });
return process;
}
}
function attach(supervisor, fn, pname) {
return (...args) => {
supervisor.send({ type: 'RUN', fn, args, pname});
}
}
function watchExit(fn) {
return function* (process, ...args) {
yield* fn(process, ...args);
process.toParent({ type: 'EXIT', p: process});
}
}
function* runDispatch(name, fn) {
let state = null;
let msg;
while(true) {
({state, msg} = yield state);
console.log('msg', name, ' <- ', msg);
state = fn(state, msg);
if (state === 'STOPPED') {
break;
}
}
}
function* supervise({pname, toParent, fork}) {
const initialState = Object.freeze({
processes: Object.freeze([]),
});
yield* runDispatch(pname, (state, msg)=> {
if (msg.type === 'INIT') {
return initialState;
};
if (msg.type === 'RUN') {
const newProcess = fork(msg.fn, msg.pname)(...msg.args);
return Object.freeze({
...state,
processes: Object.freeze([...state.processes, newProcess])
});
}
if (msg.type === 'ERROR' || msg.type === 'ABORT') {
state.processes.forEach(p=> p.send({ type: 'ABORT'}));
}
if (msg.type === 'EXIT') {
let processes = Object.freeze(state.processes.filter(
iter=> iter !== msg.p
));
if (processes.length === 0) {
return 'STOPPED';
} else {
return Object.freeze({...state, processes});
}
}
if (msg.type === 'OK') {
toParent(msg);
}
return state;
});
}
function* xfetch({ pname, toParent, send: toSelf }, { url }) {
const controller = new AbortController();
const signal = controller.signal;
(async function do_request() {
try {
const res = await fetch(url.href, { signal });
const text = await res.text();
toSelf({ type: 'OK', text });
} catch (e) {
const isAborted = (e instanceof DOMException && e.name === 'AbortError');
if (isAborted) {
toSelf({ type: 'ABORTED', pname });
} else {
console.log('e', e);
toSelf({ type: 'ERROR', pname });
}
}
toSelf({ type: 'DONE' });
})();
yield* runDispatch(pname, (state, msg)=> {
if (msg.type === 'INIT') {
return 'pending';
}
if (msg.type === 'ABORT') {
controller.abort();
}
if (msg.type === 'DONE') {
return 'STOPPED';
}
if (msg.type === 'ABORTED') {
toParent(msg);
return 'aborted';
}
if (msg.type === 'ERROR') {
toParent(msg);
return 'failed';
}
if (msg.type === 'OK') {
toParent(msg);
return 'ok';
}
return state;
});
}
function* main({ pname, fork }) {
const s = fork(supervise, 'super')();
const urls = [
new URL('https://api.myip.com'),
new URL('https://x.myip.com'),
];
for(let url of urls) {
attach(s, xfetch, `xfetch ${url.href}`)({ url });
}
yield* runDispatch(pname, (state, msg)=> {
if (msg.type === 'INIT') {
return {s};
}
if (msg.type === 'ABORT') {
s.send({ type: 'ABORT'});
}
if (msg.type === 'EXIT') {
return {s: null};
}
if (msg.type === 'OK') {
return 'STOPPED';
}
return state;
});
}
const m = spawn(main, 'main')();
//m.send({ type: 'ABORT', p: null});
m.subscribe(() => {
const {s} = m.getState();
if(s) {
const xs = s.getState();
const {processes} = s.getState();
console.log('processes', processes.map(p=> p.getState()));
} else {
console.log('no supervisor found');
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment