Skip to content

Instantly share code, notes, and snippets.

@jkpl
Last active May 7, 2020 07:27
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save jkpl/9ef31e82dea59be2be7f to your computer and use it in GitHub Desktop.
Save jkpl/9ef31e82dea59be2be7f to your computer and use it in GitHub Desktop.
Free monad based thread simulation and FRP constructs written in JavaScript. http://stuff.lepovirta.org/r/freefrp/
// Free monad based thread simulation and FRP constructs written in JavaScript
// First, we need some way to express lazy values and actions.
// We can use zero-argument functions for this purpose: call the function and
// you get the value. We also need to compose lazy values/actions. For that
// we have bindLazy function. Lazy values are not expected to be pure
// in this program: evaluating a lazy value/action at different times can produce
// a different value.
// (() -> a) -> (a -> (() -> b)) -> (() -> b)
function bindLazy(value, f) {
return function() {
return f(value())();
};
}
// Threads can be simulated with the help of lazy values. For that purpose
// they need their own set of instructions and composition rules based
// on those instructions. One way to simulate threads is base the design
// on free monads and free monad transformers.
//
// * http://www.haskellforall.com/2013/06/from-zero-to-cooperative-threads-in-33.html
// * http://hackage.haskell.org/package/transformers-free-1.0.0/docs/src/Control-Monad-Trans-Free.html
function makeFree(pure, value) {
return { pure: pure, value: value };
}
function pure(value) { return makeFree(true, value); }
function roll(functor) { return makeFree(false, functor); }
// The free monad transformer doesn't need to be fully implemented
// in order to be able to simulate threads. Only the monadic bind and
// functor mapping functionality need to be extracted. Instead of using
// a special type for expressing threads, these operations work on
// lazy values (zero-arity functions).
// The monadic bind for threads
function coBind(lazyValue, f) {
return bindLazy(lazyValue, function(free) {
return free.pure
? f(free.value)
: wrap(instructionMap(free.value, function(v) { return coBind(v, f); }));
});
}
// The functor map for threads
function coMap(lazyValue, f) {
return coBind(lazyValue, function(v) {
return makeThread(f(v));
});
}
// Equivalent of FreeT's "return" method.
function makeThread(value) {
return function() { return pure(value); };
}
function wrap(instruction) {
return function() { return roll(instruction); };
}
// Makes any functor (any object that contains "map" method) into a thread
function liftF(instruction) {
return wrap(instructionMap(instruction, makeThread));
}
// Makes any lazy value into a thread
function lift(lazyValue) {
return bindLazy(lazyValue, makeThread);
}
// Thread flow control instructions. Steps come in three forms:
// * Yield: Tell the scheduler yield the next execution step
// * Fork: Split the execution to two paths
// * Done: End the execution and don't allow extending with additional steps
//
// A step also contains a list of next steps.
// Yields contain one step, fork contains two, and done contains none.
function makeInstruction(mode, next) {
return { mode: mode, next: next };
}
function isYield(instruction) { return instruction.mode === 'yield'; }
function isFork(instruction) { return instruction.mode === 'fork'; }
function isDone(instruction) { return instruction.mode === 'done'; }
function instructionMap(instruction, f) {
return makeInstruction(instruction.mode, instruction.next.map(f));
}
function yield() { return liftF(makeInstruction('yield', [null] )); }
function done() { return liftF(makeInstruction('done', [] )); }
function cFork() { return liftF(makeInstruction('fork', [false, true] )); }
// Creates a computation which evaluates the given lazy value,
// yields the execution, and returns the final value.
function atom(lazyValue) {
return coBind(lift(lazyValue), function(v) {
return coBind(yield(), function() {
return makeThread(v);
});
});
}
// Returns the given thread if the predicate holds true.
// Otherwise returns an empty value.
function when(p, routine) {
return p ? routine : makeThread(null);
}
// Forks another thread so that it runs alongside the currently
// executing thread.
function fork(routine) {
return coBind(cFork(), function(child) {
return when(child, coBind(routine, function() {
return done();
}));
});
}
// Evaluate the given lazy value until it produces
// something other than null.
function retry(lazyvalue) {
return coBind(atom(lazyvalue), function(v) {
return v === null ? retry(lazyvalue) : makeThread(v);
});
}
// In order to use the simulated threads, a custom scheduler needs to be built
// for them:
//
// * Start by processing the given array of threads until the array is exhausted.
// * Processing is started from the head of the array.
// * Evaluate the next thread (execute it) and process the resulting instruction.
// * New threads are enqueued to the thread array.
// * The loop keeps track of how many steps it has processed, and yields its turn
// to the JS event system after a certain number of steps have been processed:
// This allows the environment to process browser events etc.
function run_(initialStepCount, threads) {
var stepCount = initialStepCount;
while(threads.length > 0 && stepCount > 0) {
var thread = threads.shift();
var freeValue = thread();
if (!freeValue.pure) {
var instruction = freeValue.value;
var next = instruction.next;
if (isYield(instruction)) {
threads.push.apply(threads, next);
} else if (isFork(instruction)) {
threads.unshift(next[0]);
threads.push.apply(threads, next.slice(1));
}
}
stepCount -= 1;
}
if (threads.length > 0) {
delay(function() { run_(initialStepCount, threads); });
}
}
function run(startThread) {
run_(20, [startThread]);
}
function delay(action) {
setTimeout(action, 0, []);
}
// Channels: channels are used for data synchronization between threads.
// Here channels implemented using JS arrays.
// A channel is essentially a FIFO queue.
// A multichannel is a broadcast channel for other channels: enqueuing a value
// to multichannel enqueues the value to all of its subscribing channels.
function makeMultiChannel() {
return [];
}
function makeChannel() {
return [];
}
function subscribe(multiChannel) {
var channel = makeChannel();
multiChannel.push(channel);
return channel;
}
function enqueue(channel, value) {
channel.push(value);
}
function multiEnqueue(multiChannel, value) {
multiChannel.forEach(function(channel) { enqueue(channel, value) });
}
function dequeue(channel) {
return channel.length > 0 ? channel.shift() : null;
}
// Receive is an atomic thread action for reading a channel.
// If the channel doesn't contain a value, the receive action yields its
// execution and tries to read the channel again when it gets its turn the next time.
function receive(channel) {
return retry(function() { return dequeue(channel) });
}
// Basic FRP signals.
// These FRP signals follow (some of) the semantics presented in the paper
// "Asynchronous Functional Reactive Programming for GUIs".
// http://people.seas.harvard.edu/~chong/pubs/pldi13-elm.pdf
var idGenerator = (function() {
var i = 0;
return function() {
return i++;
};
}());
// Every source channel will always receive a change notification
// whether or not they change. In order to avoid unnecessary computations,
// signals can produce no-change values to inform the subscribing signals
// that their values haven't changed.
function Change(hasChanged, body) {
this.hasChanged = hasChanged;
this.body = body;
}
function change(body) { return new Change(true, body); }
function noChange(body) { return new Change(false, body); }
function Signal(firstValue, multiChannel) {
this.firstValue = firstValue;
this.multiChannel = multiChannel;
this.subscribe = function() {
return subscribe(multiChannel);
};
}
function threadSignal(firstValue, multiChannel) {
return makeThread(new Signal(firstValue, multiChannel));
}
// 1. Execute the given action to get a new message
// 2. Send the message to given channel
// 3. Recurse
function sendLoop(channel, previousValue, action) {
return coBind(action(previousValue), function(msg) {
multiEnqueue(channel, msg);
return sendLoop(channel, msg.body, action);
});
}
// Subscriptions and channels can be setup outside of the thread system, but
// channel reading has to be done in the thread system.
function eventSignal(eventChannel, signalId, valueSource, firstValue) {
var output = makeMultiChannel();
var events = subscribe(eventChannel);
var input = subscribe(valueSource);
var loop = fork(sendLoop(output, firstValue, function(previousValue) {
return coBind(receive(events), function(eventId) {
if (signalId === eventId) {
return coMap(receive(input), change);
} else {
return makeThread(noChange(previousValue));
}
});
}));
return coBind(loop, function() {
return threadSignal(firstValue, output);
});
}
// liftN and folp expect the given function to produce a thread.
// This allows those functions to be made interleavable.
function lift1(f, signal) {
var output = makeMultiChannel();
var input = signal.subscribe();
return coBind(f(signal.firstValue), function(firstValue) {
var loop = fork(sendLoop(output, firstValue, function(previousValue) {
return coBind(receive(input), function(msg1) {
if (msg1.hasChanged) {
return coMap(f(msg1.body), change);
} else {
return makeThread(noChange(previousValue));
}
});
}));
return coBind(loop, function() {
return threadSignal(firstValue, output);
});
});
}
function lift2(f, signal1, signal2) {
var output = makeMultiChannel();
var input1 = signal1.subscribe();
var input2 = signal2.subscribe();
return coBind(f(signal1.firstValue, signal2.firstValue), function(firstValue) {
var loop = fork(sendLoop(output, firstValue, function(previousValue) {
return coBind(receive(input1), function(msg1) {
return coBind(receive(input2), function(msg2) {
if (msg1.hasChanged || msg2.hasChanged) {
return coMap(f(msg1.body, msg2.body), change);
} else {
return makeThread(noChange(previousValue));
}
});
})
}));
return coBind(loop, function() {
return threadSignal(firstValue, output);
});
});
}
function foldp(f, firstValue, signal) {
var output = makeMultiChannel();
var input = signal.subscribe();
var loop = fork(sendLoop(output, firstValue, function(acc) {
return coBind(receive(input), function(msg) {
if (msg.hasChanged) {
return coMap(f(msg.body, acc), change);
} else {
return makeThread(noChange(acc));
}
});
}));
return coBind(loop, function() {
return threadSignal(firstValue, output);
});
}
function async(eventChannel, signal) {
var output = makeMultiChannel();
var input = signal.subscribe();
var signalId = idGenerator();
var loop = coBind(receive(input), function(msg) {
if (msg.hasChanged) {
multiEnqueue(output, msg.body);
multiEnqueue(eventChannel, signalId);
return loop;
} else {
return loop;
}
});
return coBind(fork(loop), function() {
return eventSignal(eventChannel, signalId, output, signal.firstValue);
});
}
// Execute a callback for each new signal value.
function signalForeach(signal, callback) {
var input = signal.subscribe();
var loop = coBind(receive(input), function(msg) {
var action = atom(function() { callback(msg.body); });
return coBind(when(msg.hasChanged, action), function() {
return loop;
});
});
return fork(loop);
}
// Utilities
function noAction() {
return atom(function() { return null; });
}
function numberRange(from, to) {
var increment = (to - from) / Math.abs(to - from);
var comparer = increment > 0
? function(v) { return v <= to; }
: function(v) { return v >= to; };
var acc = [];
for (var i = from; comparer(i); i += increment) {
acc.push(i);
}
return acc;
}
// Does each action in the given array sequentially.
function doActions(actions) {
if (actions.length > 1) {
return coBind(actions[0], function() {
return doActions(actions.slice(1));
});
} else if (actions.length == 1) {
return actions[0];
} else {
return noAction();
}
}
// For each item in the given item array, execute an action.
function forEachDoAction(items, actionGen) {
var actions = items.map(function(i) { return actionGen(i); });
return doActions(actions);
}
function printSignal(prefix, signal) {
return signalForeach(signal, function(v) { console.log(prefix + ': ' + v); });
}
function print(v) {
return atom(function() {
console.log(v);
return v;
});
};
// Make the result of the given function atomic.
function atomize(f) {
return function() {
var args = arguments;
return atom(function() {
return f.apply(null, args);
});
};
}
// Generate empty actions before executing the given action.
// This can be used for simulating long delays between the start of the
// execution and the final result.
function delayAction(steps, action) {
var actions = numberRange(1, steps).map(function() { return noAction(); });
actions.push(action);
return doActions(actions);
}
// Sample program
// times 10
var t10 = atomize(function(v) {
return v * 10;
});
// plus 1 (delayed)
var p1 = function(v) {
return delayAction(1000, atom(function() {
return v + 1;
}));
};
// previous value + next value
var accumulate = atomize(function(next, acc) {
return next + acc;
});
// two values paired
var pair = atomize(function(v1, v2) {
return [v1, v2];
});
// This program contains the following setup:
// * an event signal as the top most signal
// * acc = foldp (+) 0 eventSignal
// * times10 = lift1 (* 10) eventSignal
// * slowSignal = lift1 slowPlus1 times10
// * asyncSlowSignal = async slowSignal
// * paired = lift2 pair times10 asyncSlowSignal
//
// The program writes numbers [1..10] to the event signal.
var program = function() {
var eventId = idGenerator();
var eventChannel = makeMultiChannel();
var eventSignalChannel = makeMultiChannel();
return coBind(eventSignal(eventChannel, eventId, eventSignalChannel, 0), function(sig1) {
return coBind(foldp(accumulate, 0, sig1), function(acc) {
return coBind(lift1(t10, sig1), function(times10) {
return coBind(lift1(p1, times10), function(slowSignal) {
return coBind(async(eventChannel, slowSignal), function(asyncSlowSignal) {
return coBind(lift2(pair, times10, asyncSlowSignal), function(paired) {
return doActions([
printSignal('acc ', acc),
printSignal('times 10 ', times10),
printSignal('paired ', paired),
fork(forEachDoAction(numberRange(1, 10), function(n) {
multiEnqueue(eventSignalChannel, n);
multiEnqueue(eventChannel, eventId);
return print('sent ' + n);
}))
]);
});
});
});
});
});
});
};
// Browser demo
function browserEventSignal(eventChannel, target, eventType, callbackArg) {
var signalId = idGenerator();
var eventSignalChannel = makeMultiChannel();
var callback = typeof callbackArg === 'function'
? callbackArg
: function (v) { return v; }
target.addEventListener(eventType, function(e) {
multiEnqueue(eventChannel, signalId);
multiEnqueue(eventSignalChannel, callback(e));
}, false);
return eventSignal(eventChannel, signalId, eventSignalChannel, callback(null));
}
function getCoordinates(e) {
if (e) {
return [e.screenX, e.screenY];
} else {
return [0, 0];
}
}
var sumCoordinates = atomize(function(next, acc) {
return acc + next[0] + next[1];
});
var combineCoordinates = atomize(function(xy, acc) {
return [xy[0], xy[1], acc];
});
var reduceSize = atomize(function(xy) {
return [xy[0] - 100, xy[1] - 100];
});
function mouseClickSignal(eventChannel) {
return browserEventSignal(eventChannel, window, 'click', getCoordinates);
}
function mouseMoveSignal(eventChannel) {
return browserEventSignal(eventChannel, window, 'mousemove', getCoordinates);
}
function setData(element, v) {
var text = ['{ x: ', v[0], ', y: ', v[1], ', fibonacci: ', v[2], ' }'].join(' ');
outputarea.textContent = text;
}
function setBoxSize(element, v) {
element.style.width = v[0] + 'px';
element.style.height = v[1] + 'px';
}
function boundFib(value) {
return naiveFibonacci(value % 20);
}
function naiveFibonacci(n) {
if (n <= 1) {
return atom(function() { return 1; });
} else {
return coBind(naiveFibonacci(n - 1), function(left) {
return coBind(naiveFibonacci(n - 2), function(right) {
return atom(function() { return left + right; });
});
});
}
}
var browserProgram = function() {
var eventChannel = makeMultiChannel();
var outputarea = document.getElementById('outputarea');
var bluearea = document.getElementById('bluearea');
return coBind(mouseMoveSignal(eventChannel), function(mouseMove) {
return coBind(mouseClickSignal(eventChannel), function(mouseClick) {
return coBind(foldp(sumCoordinates, 0, mouseClick), function(sumCoord) {
return coBind(lift1(boundFib, sumCoord), function(fib) {
return coBind(async(eventChannel, fib), function(asyncFib) {
return coBind(lift2(combineCoordinates, mouseClick, asyncFib), function(combineCoord) {
return coBind(lift1(reduceSize, mouseMove), function(reducedSize) {
return doActions([
signalForeach(combineCoord, function(v) { setData(outputarea, v); }),
signalForeach(reducedSize, function(v) { setBoxSize(bluearea, v); })
]);
});
});
});
});
});
});
});
};
document.addEventListener('DOMContentLoaded', function() {
run(browserProgram());
}, false);
// Uncomment to run the other demo.
//run(program());
// run((function () {
// return coBind(fork(coBind(naiveFibonacci(9), function(v) {
// return print('fibonacci result: ' + v);
// })), function() {
// return fork(forEachDoAction(numberRange(1, 1000), function(n) {
// return print('counter: ' + n);
// }));
// });
// }()));
@jkpl
Copy link
Author

jkpl commented Nov 12, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment