Skip to content

Instantly share code, notes, and snippets.

@jasonkuhrt
Last active November 12, 2017 03:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasonkuhrt/571bc70980a03eeb88751477a812c77f to your computer and use it in GitHub Desktop.
Save jasonkuhrt/571bc70980a03eeb88751477a812c77f to your computer and use it in GitHub Desktop.
Most.js Buffer Until
function bufferUntil(stream, promise) {
return {
run: (sink, scheduler) => {
let didResolve = false
const buffer = []
stream.source.run(
{
event: (t, x) => {
if (didResolve) {
sink.event(t, x)
} else {
buffer.push([t, x])
}
},
error: sink.error.bind(sink),
},
scheduler,
)
promise
.then(() => {
setImmediate(() => {
didResolve = true
buffer.forEach(([t, x]) => sink.event(t, x))
buffer.length = 0
}, 0)
})
.catch(e => {
sink.error(e)
})
},
}
}
Most.Stream.prototype.bufferUntil = function(promise) {
return new Most.Stream(bufferUntil(this, promise))
}
const promiseOfFunction = Promise.resolve(x => x * 10)
Most.fromPromise(promiseOfFunction)
.ap(
Most.periodic(100)
.scan((acc, _) => acc + 1, 0)
.bufferUntil(promiseOfFunction),
)
.tap(console.log.bind(null, "tap:"))
.drain()
tap: 0
tap: 10
tap: 20
tap: 30
tap: 40
tap: 50
tap: 60
tap: 70
tap: 80
tap: 90
tap: 100
tap: 110
tap: 120
tap: 130
tap: 140
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment