-
-
Save ashnur/f2fb2cf230d47aea9a63123aedb5926a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const compareAsc = require('date-fns/compare_asc') | |
const { remove } = require('transducers-js') | |
const { take, repeatTake, go, put, chan } = require('medium') | |
const services = [] | |
const filters = [] | |
const cleanups = [] | |
const mixChannel = chan( | |
1000, | |
remove(({ _history, address, action }) => { | |
// to cancel messages that match a pattern and are older (smaller timestamp than the filter, came BEFORE the filter) | |
// to keep non matching, and newer | |
// the predicate has to return false for non matching OR newer messages, true for matching AND older messages | |
// m | |
// ts adr act | |
//f < = = -> f.ts <= m.ts, message newer than filter -> true | |
//f < = ! -> non-matching act -> true | |
//f < ! = -> non-matching act -> true | |
//f < ! ! -> non-matching act -> true | |
//f >= = = -> f.ts > m.ts, message older than filter -> false | |
//f >= = ! -> non-matching act, and old -> true | |
//f >= ! = -> non-matching act, and old -> true | |
//f >= ! ! -> non-matching act, and old -> true | |
// console.log(_history) | |
const ts = _history[0].ts | |
let olderMessages = true | |
/** | |
* returns true only if all the filters returned true, that is if | |
* all `differs` were true, there was nothing the same, so nothing | |
* hadd to be filtered | |
*/ | |
return !messageHelper.filters.every((f, i) => { | |
if (address === 'STATE_UPDATE') return true //never filter state updates, even if they are late | |
const differs = | |
f.action !== action || f.address !== address || compareAsc(ts, f.ts) < 1 | |
olderMessages = compareAsc(ts, f.ts) < 0 | |
// console.log('filter test:', r, olderMessages) | |
// console.log(action, f.action) | |
// console.log(address, f.address) | |
// console.log(f, messageHelper.filters.length, f.action !== action , f.address !== address , compareAsc(ts, f.ts) < 1, compareAsc(ts, f.ts)) | |
/** | |
* differs is true if anything between the filter and the | |
* message is different and the filter was created after | |
* or at the same time as the message (see table above) | |
* | |
* trigger cleanup functions for same messages | |
*/ | |
if (!differs) { | |
console.info('filtered', address, action, ts) | |
cleanup(address, action, ts) | |
} | |
/** | |
* olderMessages flags filtering messages that were created | |
* after the filter, triggering autoremoval | |
*/ | |
if (!differs && !olderMessages) { | |
removeFilter(f) | |
removeCleanup(address, action, ts) | |
} | |
return differs | |
}) | |
}) | |
) | |
const startProcess = (en, ex, receivingAddress, cb) => { | |
go(async () => | |
repeatTake(en, async msg => { | |
const { address, ...rest } = msg | |
// console.log(receivingAddress === address, address) | |
if (receivingAddress === address) { | |
cb(ex, { address, receivingAddress, ...rest }) | |
} else { | |
// put it back to the channel, so others can see it too | |
put(en, msg) | |
} | |
}) | |
) | |
} | |
const subscribe = (en, receivingAddress, cb) => { | |
go(async () => | |
repeatTake(en, async msg => { | |
const { address, ...rest } = msg | |
// console.log(receivingAddress === address, address) | |
if (receivingAddress === address) { | |
cb({ address, receivingAddress, ...rest }) | |
} else { | |
// put it back to the channel, so others can see it too | |
put(en, msg) | |
} | |
}) | |
) | |
} | |
/** | |
* start a loop for each service passed as argument | |
* and listen on the services's exit channel, | |
* putting each outcoming message on the mixChannel | |
* | |
* @param {...Services} svcs | |
*/ | |
const loadServices = (...servicesToLoad) => { | |
services.push(...servicesToLoad) | |
servicesToLoad.forEach(svc => { | |
go(async () => { | |
// TODO: at some point we will want to stop these loops and *unload* services | |
// console.log(svc.ex, svc) | |
repeatTake(svc.ex, async msg => { | |
// console.log('take', msg) | |
put(mixChannel, msg) | |
}) | |
}) | |
}) | |
} | |
/** | |
* Run all cleanups | |
* @param {String} address | |
* @param {String} action | |
* @param {Date} ts | |
*/ | |
const cleanup = (address, action, ts) => | |
cleanups.forEach(c => { | |
// console.log('ABORT', address, action, ts) | |
c({ address, action, ts }) | |
}) | |
/** | |
* Remove cleanup | |
* @param {String} address | |
* @param {String} action | |
* @param {Date} ts | |
*/ | |
const removeCleanup = (address, action, ts) => { | |
cleanups.forEach((c, i) => { | |
if (c.pred({ address, action, ts })) { | |
// remove previous ones too | |
cleanups.splice(i, 1) | |
} | |
}) | |
} | |
const messageHelper = { | |
command, | |
send, | |
get filters() { | |
return filters | |
}, | |
get mixChannel() { | |
return mixChannel | |
}, | |
registerCleanup(pred, fn) { | |
const c = message => { | |
if (pred(message)) { | |
fn(message) | |
} | |
} | |
c.pred = pred | |
cleanups.push(c) | |
}, | |
startProcess, | |
subscribe, | |
loadServices, | |
} | |
/** | |
* | |
*/ | |
const stamp = (_history, address, action) => { | |
if (_history.push == null) { | |
// console.log(_history) | |
} | |
_history.push({ address, action, ts: new Date() }) | |
} | |
/** | |
* send a message | |
* | |
* adds default empty object payload | |
* adds default empty log history | |
* adds first message | |
* removes all filters for similar but older messages | |
* (similar meaning action and address is the same) | |
* creates new filter with the first message's timestamp | |
*/ | |
async function send(ch, address, action, payload = {}, _history = []) { | |
// console.log(payload) | |
stamp(_history, address, action) | |
if (_history.length === 1) { | |
// first in the chain, create filter | |
removeFilter(_history[0]) | |
filters.push(_history[0]) | |
} | |
// console.log('send', ch, { address, action, payload, _history }) | |
put(ch, { address, action, payload, _history }) | |
} | |
/** | |
* loop over all the filters and remove those that match the passed message's | |
* address and action and are older than the message | |
* | |
*/ | |
const removeFilter = ({ address, action, ts }) => { | |
const fs = messageHelper.filters | |
fs.forEach((f, i) => { | |
if ( | |
f.address === address && | |
f.action === action && | |
compareAsc(f.ts, ts) < 0 | |
) { | |
// remove previous ones too | |
fs.splice(i, 1) | |
} | |
}) | |
} | |
/** | |
* send a command | |
* same as with the message, but this has a responseAddress property and | |
* allows cleaning the filter based on the given command (this allows distributed cancellation of messages) | |
* | |
* adds default empty object payload | |
* adds default empty log history | |
* adds first message | |
* removes all filters for similar but older messages | |
* (similar meaning action and address is the same) | |
* creates new filter with the first message's timestamp | |
* passes along a cleanup function that removes the filter when called | |
*/ | |
async function command( | |
ch, | |
address, | |
responseAddress, | |
action, | |
payload = {}, | |
_history = [] | |
) { | |
// console.log(_history, typeof _history) | |
// FIXME: order is important, but it's not obvious | |
stamp(_history, address, action) | |
if (_history.length === 1) { | |
// first in the chain, create filter | |
removeFilter(_history[0]) | |
filters.push(_history[0]) | |
} | |
put(ch, { | |
address, | |
action, | |
responseAddress, | |
_history, | |
payload, | |
removeFilter: () => removeFilter(_history[0]), | |
}) | |
} | |
/** | |
* autolooping over services for each message on the mixer channel | |
* and stamping + forwarding it to the respective service. | |
*/ | |
go(async () => { | |
repeatTake(mixChannel, async msg => { | |
// console.log('take', msg) | |
services.forEach(async svc => { | |
if (Object.values(svc.listens).includes(msg.address)) { | |
// send it to each service that listens to the address | |
// FIXME: add query based filtering for messages on STATE_TX | |
// console.log('put', msg, svc) | |
stamp(msg._history, msg.address, msg.action) | |
if (!['STATE_UPDATE', 'STATE_TX'].includes(msg.address)) | |
msg.currentState = window.state // FIXME: temporary hack, this file shouldn't magically know addresses from StateSVC anyway | |
put(svc.en, msg) | |
} | |
}) | |
}) | |
}) | |
module.exports = messageHelper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment