Skip to content

Instantly share code, notes, and snippets.

@jasonkuhrt
Last active November 12, 2017 03:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasonkuhrt/29ecb80be43fbd398abf3aa396f9956e to your computer and use it in GitHub Desktop.
Save jasonkuhrt/29ecb80be43fbd398abf3aa396f9956e to your computer and use it in GitHub Desktop.
Most.js Buffered Apply
function bufferedApply(streamOfFunctions, streamOfValues) {
const source = {
run: (sink, scheduler) => {
let f = null
const buffer = []
streamOfFunctions.source.run(
{
event: (t, newF) => {
f = newF
if (buffer.length) {
buffer.map(f).forEach(x => {
sink.event(t, x)
})
buffer.length = 0
}
},
error: sink.error.bind(sink),
},
scheduler,
)
streamOfValues.source.run(
{
event: (t, newX) => {
if (f) {
sink.event(t, f(newX))
} else {
buffer.push(newX)
}
},
error: sink.error.bind(sink),
},
scheduler,
)
// return ??? // Disposable<A>
},
}
return source
}
Most.Stream.prototype.bufferedApply = function(streamOfValues) {
return new Most.Stream(bufferedApply(this, streamOfValues))
}
Most.fromPromise(Promise.resolve(x => x * 10))
.bufferedApply(Most.periodic(100).scan((acc, _) => acc + 1, 0))
.tap(console.log.bind(null, "tap:"))
.drain()
tap: 0
tap: 10
tap: 20
tap: 30
tap: 40
tap: 50
tap: 60
tap: 70
tap: 80
tap: 90
tap: 100
tap: 110
tap: 120
tap: 130
tap: 140
tap: 150
tap: 160
tap: 170
tap: 180
tap: 190
tap: 200
tap: 210
tap: 220
tap: 230
tap: 240
tap: 250
tap: 260
tap: 270
tap: 280
tap: 290
tap: 300
tap: 310
tap: 320
tap: 330
tap: 340
tap: 350
tap: 360
tap: 370
tap: 380
tap: 390
tap: 400
tap: 410
tap: 420
tap: 430
tap: 440
tap: 450
tap: 460
tap: 470
tap: 480
tap: 490
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment