Skip to content

Instantly share code, notes, and snippets.

@ashnur
Created April 25, 2020 19:03
Show Gist options
  • Save ashnur/f2fb2cf230d47aea9a63123aedb5926a to your computer and use it in GitHub Desktop.
Save ashnur/f2fb2cf230d47aea9a63123aedb5926a to your computer and use it in GitHub Desktop.
const compareAsc = require('date-fns/compare_asc')
const { remove } = require('transducers-js')
const { take, repeatTake, go, put, chan } = require('medium')
const services = []
const filters = []
const cleanups = []
const mixChannel = chan(
1000,
remove(({ _history, address, action }) => {
// to cancel messages that match a pattern and are older (smaller timestamp than the filter, came BEFORE the filter)
// to keep non matching, and newer
// the predicate has to return false for non matching OR newer messages, true for matching AND older messages
// m
// ts adr act
//f < = = -> f.ts <= m.ts, message newer than filter -> true
//f < = ! -> non-matching act -> true
//f < ! = -> non-matching act -> true
//f < ! ! -> non-matching act -> true
//f >= = = -> f.ts > m.ts, message older than filter -> false
//f >= = ! -> non-matching act, and old -> true
//f >= ! = -> non-matching act, and old -> true
//f >= ! ! -> non-matching act, and old -> true
// console.log(_history)
const ts = _history[0].ts
let olderMessages = true
/**
* returns true only if all the filters returned true, that is if
* all `differs` were true, there was nothing the same, so nothing
* hadd to be filtered
*/
return !messageHelper.filters.every((f, i) => {
if (address === 'STATE_UPDATE') return true //never filter state updates, even if they are late
const differs =
f.action !== action || f.address !== address || compareAsc(ts, f.ts) < 1
olderMessages = compareAsc(ts, f.ts) < 0
// console.log('filter test:', r, olderMessages)
// console.log(action, f.action)
// console.log(address, f.address)
// console.log(f, messageHelper.filters.length, f.action !== action , f.address !== address , compareAsc(ts, f.ts) < 1, compareAsc(ts, f.ts))
/**
* differs is true if anything between the filter and the
* message is different and the filter was created after
* or at the same time as the message (see table above)
*
* trigger cleanup functions for same messages
*/
if (!differs) {
console.info('filtered', address, action, ts)
cleanup(address, action, ts)
}
/**
* olderMessages flags filtering messages that were created
* after the filter, triggering autoremoval
*/
if (!differs && !olderMessages) {
removeFilter(f)
removeCleanup(address, action, ts)
}
return differs
})
})
)
const startProcess = (en, ex, receivingAddress, cb) => {
go(async () =>
repeatTake(en, async msg => {
const { address, ...rest } = msg
// console.log(receivingAddress === address, address)
if (receivingAddress === address) {
cb(ex, { address, receivingAddress, ...rest })
} else {
// put it back to the channel, so others can see it too
put(en, msg)
}
})
)
}
const subscribe = (en, receivingAddress, cb) => {
go(async () =>
repeatTake(en, async msg => {
const { address, ...rest } = msg
// console.log(receivingAddress === address, address)
if (receivingAddress === address) {
cb({ address, receivingAddress, ...rest })
} else {
// put it back to the channel, so others can see it too
put(en, msg)
}
})
)
}
/**
* start a loop for each service passed as argument
* and listen on the services's exit channel,
* putting each outcoming message on the mixChannel
*
* @param {...Services} svcs
*/
const loadServices = (...servicesToLoad) => {
services.push(...servicesToLoad)
servicesToLoad.forEach(svc => {
go(async () => {
// TODO: at some point we will want to stop these loops and *unload* services
// console.log(svc.ex, svc)
repeatTake(svc.ex, async msg => {
// console.log('take', msg)
put(mixChannel, msg)
})
})
})
}
/**
* Run all cleanups
* @param {String} address
* @param {String} action
* @param {Date} ts
*/
const cleanup = (address, action, ts) =>
cleanups.forEach(c => {
// console.log('ABORT', address, action, ts)
c({ address, action, ts })
})
/**
* Remove cleanup
* @param {String} address
* @param {String} action
* @param {Date} ts
*/
const removeCleanup = (address, action, ts) => {
cleanups.forEach((c, i) => {
if (c.pred({ address, action, ts })) {
// remove previous ones too
cleanups.splice(i, 1)
}
})
}
const messageHelper = {
command,
send,
get filters() {
return filters
},
get mixChannel() {
return mixChannel
},
registerCleanup(pred, fn) {
const c = message => {
if (pred(message)) {
fn(message)
}
}
c.pred = pred
cleanups.push(c)
},
startProcess,
subscribe,
loadServices,
}
/**
*
*/
const stamp = (_history, address, action) => {
if (_history.push == null) {
// console.log(_history)
}
_history.push({ address, action, ts: new Date() })
}
/**
* send a message
*
* adds default empty object payload
* adds default empty log history
* adds first message
* removes all filters for similar but older messages
* (similar meaning action and address is the same)
* creates new filter with the first message's timestamp
*/
async function send(ch, address, action, payload = {}, _history = []) {
// console.log(payload)
stamp(_history, address, action)
if (_history.length === 1) {
// first in the chain, create filter
removeFilter(_history[0])
filters.push(_history[0])
}
// console.log('send', ch, { address, action, payload, _history })
put(ch, { address, action, payload, _history })
}
/**
* loop over all the filters and remove those that match the passed message's
* address and action and are older than the message
*
*/
const removeFilter = ({ address, action, ts }) => {
const fs = messageHelper.filters
fs.forEach((f, i) => {
if (
f.address === address &&
f.action === action &&
compareAsc(f.ts, ts) < 0
) {
// remove previous ones too
fs.splice(i, 1)
}
})
}
/**
* send a command
* same as with the message, but this has a responseAddress property and
* allows cleaning the filter based on the given command (this allows distributed cancellation of messages)
*
* adds default empty object payload
* adds default empty log history
* adds first message
* removes all filters for similar but older messages
* (similar meaning action and address is the same)
* creates new filter with the first message's timestamp
* passes along a cleanup function that removes the filter when called
*/
async function command(
ch,
address,
responseAddress,
action,
payload = {},
_history = []
) {
// console.log(_history, typeof _history)
// FIXME: order is important, but it's not obvious
stamp(_history, address, action)
if (_history.length === 1) {
// first in the chain, create filter
removeFilter(_history[0])
filters.push(_history[0])
}
put(ch, {
address,
action,
responseAddress,
_history,
payload,
removeFilter: () => removeFilter(_history[0]),
})
}
/**
* autolooping over services for each message on the mixer channel
* and stamping + forwarding it to the respective service.
*/
go(async () => {
repeatTake(mixChannel, async msg => {
// console.log('take', msg)
services.forEach(async svc => {
if (Object.values(svc.listens).includes(msg.address)) {
// send it to each service that listens to the address
// FIXME: add query based filtering for messages on STATE_TX
// console.log('put', msg, svc)
stamp(msg._history, msg.address, msg.action)
if (!['STATE_UPDATE', 'STATE_TX'].includes(msg.address))
msg.currentState = window.state // FIXME: temporary hack, this file shouldn't magically know addresses from StateSVC anyway
put(svc.en, msg)
}
})
})
})
module.exports = messageHelper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment