Last active
March 8, 2020 22:03
-
-
Save Connoropolous/c5cccca85a19cf7532df5a55d66c5460 to your computer and use it in GitHub Desktop.
multiply two streams by one another, producing a matrix of the two streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { expect } = require('chai') | |
const sinon = require('sinon') | |
const matrix = require('../libs/matrix') | |
const { newMockMakeContactable } = require('rsf-contactable') | |
const pull = require('pull-stream') | |
const { tap } = require('pull-tap') | |
const pullParamap = require('pull-paramap') | |
const Pushable = require('pull-pushable') | |
/* | |
This system takes two sources, either realtime, or fixed in length | |
and is designed to produce one result (response) for every possible combination | |
of the items in the two streams | |
It will do this dynamically and ongoingly. | |
It will do this by performing a 'stream replication' of one of the streams | |
sourceX: a pull-stream source | |
sourceY: a pull-stream source | |
preMergeTransformer: transform the stream, prior to merging it with the rest | |
returns: a new pull-streams source, whatever the product of combining an X and a Y is... a Z? | |
*/ | |
const matrixify = ( | |
sourceX, | |
sourceY, | |
// default is to pass it through exactly as is | |
preMergeTransformer = pull.map(a => a) | |
) => { | |
//immediately create the source which will be returned, which is a Pushable | |
const zSource = Pushable() | |
// cache the prompts that have streamed in so far | |
// for the purposes of forwarding them to new joiners | |
const pastY = [] | |
// cache the existing yStreams too, for the purposes of | |
// immediately forwarding new Ys to them | |
const yStreams = [] | |
// create a standalone pipeline meant to drain the sourceY, | |
// and in the process of doing so, replicates it for all existing | |
// yStreams, (and future ones by caching the old values for future joiners) | |
pull( | |
sourceY, | |
// tap is for side effects (does not modify the stream in any way) | |
tap(y => { | |
// drop the new Y back into the cache | |
pastY.push(y) | |
// feed the Y directly through to existing yStreams | |
yStreams.forEach(yStream => yStream.push(y)) | |
}), | |
pull.drain(() => {}) | |
) | |
// create a second standalone pipeline, meant to drain | |
// the sourceX stream | |
pull( | |
sourceX, | |
pull.drain(x => { | |
const yStream = Pushable() | |
// add the new yStream to the cache of yStreams | |
yStreams.push(yStream) | |
// load in the cached prompts, that came in before this contactable joined | |
pastY.forEach(yStream.push) | |
// pull prompts through a one-at-a-time (async) | |
// process to get someones response to a certain prompt, pipe that response out | |
pull( | |
yStream, | |
// map y back into the xStream | |
pull.map(y => [x, y]), | |
// pass it through the preMergeTransformer hook | |
preMergeTransformer, | |
// pipe new results through to the zSource | |
pull.drain(zSource.push) | |
) | |
}) | |
) | |
return zSource | |
} | |
// consume a prompt & a contactable, and wait for the contactable to provide | |
// a valid response | |
// suitable for passing to pull.asyncMap | |
const getResponseForPrompt = ([contactable, prompt], cb) => { | |
console.log(contactable.id + ' received: ' + prompt) | |
/* | |
can do the juicy stuff in here ... | |
listen | |
wait | |
validate | |
teardown | |
*/ | |
setTimeout(function() { | |
cb(null, contactable.id + ' responded to: ' + prompt) | |
}, Math.floor(Math.random() * 5000)) | |
} | |
describe('PromptPeopleResponse Matrix', function() { | |
context( | |
'when you begin with a two streams: prompts and contactables', | |
function() { | |
it('should deliver pairs of all combinations of prompts and contactables through', function(done) { | |
const mockMakeContactable = newMockMakeContactable(sinon.spy) | |
const contactablesSource = Pushable() | |
const promptsSource = Pushable() | |
promptsSource.push('prompt 1') | |
setTimeout(() => { | |
const c1 = mockMakeContactable({ id: 'contactable1' }) | |
contactablesSource.push(c1) | |
}, 4000) | |
setTimeout(() => { | |
const c2 = mockMakeContactable({ id: 'contactable2' }) | |
contactablesSource.push(c2) | |
promptsSource.push('prompt 2') | |
}, 6000) | |
// const spy = sinon.spy() | |
let count = 0 | |
pull( | |
matrixify( | |
// sourceX | |
contactablesSource, | |
// source Y | |
promptsSource, | |
// preMergeTransform | |
pull.asyncMap(getResponseForPrompt) | |
), | |
pull.drain(response => { | |
count++ | |
console.log(response) | |
if (count === 4) done() | |
}) | |
) | |
}) | |
} | |
) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment