Skip to content

Instantly share code, notes, and snippets.

@Connoropolous
Last active March 8, 2020 22:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Connoropolous/c5cccca85a19cf7532df5a55d66c5460 to your computer and use it in GitHub Desktop.
Save Connoropolous/c5cccca85a19cf7532df5a55d66c5460 to your computer and use it in GitHub Desktop.
multiply two streams by one another, producing a matrix of the two streams
const { expect } = require('chai')
const sinon = require('sinon')
const matrix = require('../libs/matrix')
const { newMockMakeContactable } = require('rsf-contactable')
const pull = require('pull-stream')
const { tap } = require('pull-tap')
const pullParamap = require('pull-paramap')
const Pushable = require('pull-pushable')
/*
This system takes two sources, either realtime, or fixed in length
and is designed to produce one result (response) for every possible combination
of the items in the two streams
It will do this dynamically and ongoingly.
It will do this by performing a 'stream replication' of one of the streams
sourceX: a pull-stream source
sourceY: a pull-stream source
preMergeTransformer: transform the stream, prior to merging it with the rest
returns: a new pull-streams source, whatever the product of combining an X and a Y is... a Z?
*/
const matrixify = (
sourceX,
sourceY,
// default is to pass it through exactly as is
preMergeTransformer = pull.map(a => a)
) => {
//immediately create the source which will be returned, which is a Pushable
const zSource = Pushable()
// cache the prompts that have streamed in so far
// for the purposes of forwarding them to new joiners
const pastY = []
// cache the existing yStreams too, for the purposes of
// immediately forwarding new Ys to them
const yStreams = []
// create a standalone pipeline meant to drain the sourceY,
// and in the process of doing so, replicates it for all existing
// yStreams, (and future ones by caching the old values for future joiners)
pull(
sourceY,
// tap is for side effects (does not modify the stream in any way)
tap(y => {
// drop the new Y back into the cache
pastY.push(y)
// feed the Y directly through to existing yStreams
yStreams.forEach(yStream => yStream.push(y))
}),
pull.drain(() => {})
)
// create a second standalone pipeline, meant to drain
// the sourceX stream
pull(
sourceX,
pull.drain(x => {
const yStream = Pushable()
// add the new yStream to the cache of yStreams
yStreams.push(yStream)
// load in the cached prompts, that came in before this contactable joined
pastY.forEach(yStream.push)
// pull prompts through a one-at-a-time (async)
// process to get someones response to a certain prompt, pipe that response out
pull(
yStream,
// map y back into the xStream
pull.map(y => [x, y]),
// pass it through the preMergeTransformer hook
preMergeTransformer,
// pipe new results through to the zSource
pull.drain(zSource.push)
)
})
)
return zSource
}
// consume a prompt & a contactable, and wait for the contactable to provide
// a valid response
// suitable for passing to pull.asyncMap
const getResponseForPrompt = ([contactable, prompt], cb) => {
console.log(contactable.id + ' received: ' + prompt)
/*
can do the juicy stuff in here ...
listen
wait
validate
teardown
*/
setTimeout(function() {
cb(null, contactable.id + ' responded to: ' + prompt)
}, Math.floor(Math.random() * 5000))
}
describe('PromptPeopleResponse Matrix', function() {
context(
'when you begin with a two streams: prompts and contactables',
function() {
it('should deliver pairs of all combinations of prompts and contactables through', function(done) {
const mockMakeContactable = newMockMakeContactable(sinon.spy)
const contactablesSource = Pushable()
const promptsSource = Pushable()
promptsSource.push('prompt 1')
setTimeout(() => {
const c1 = mockMakeContactable({ id: 'contactable1' })
contactablesSource.push(c1)
}, 4000)
setTimeout(() => {
const c2 = mockMakeContactable({ id: 'contactable2' })
contactablesSource.push(c2)
promptsSource.push('prompt 2')
}, 6000)
// const spy = sinon.spy()
let count = 0
pull(
matrixify(
// sourceX
contactablesSource,
// source Y
promptsSource,
// preMergeTransform
pull.asyncMap(getResponseForPrompt)
),
pull.drain(response => {
count++
console.log(response)
if (count === 4) done()
})
)
})
}
)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment