Skip to content

Instantly share code, notes, and snippets.

@Frando
Created December 3, 2019 13:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Frando/21bc9e796544692b51de7e85edd1983a to your computer and use it in GitHub Desktop.
Save Frando/21bc9e796544692b51de7e85edd1983a to your computer and use it in GitHub Desktop.
const tape = require('tape')
const { Kappa } = require('..')
const { runAll } = require('./lib/util')
tape.only('simple source', t => {
const kappa = new Kappa()
kappa.use('view1', makeSimpleSource(), makeSimpleView())
kappa.use('view2', makeSimpleSource(), makeSimpleView())
kappa.api.view1.source.push(1)
kappa.api.view1.source.push(2)
kappa.api.view2.source.push(3)
kappa.api.view2.source.push(4)
runAll([
cb => kappa.api.view1.collect((err, res) => {
t.error(err)
t.deepEqual(res, [1, 2])
cb()
}),
cb => kappa.api.view2.collect((err, res) => {
t.error(err)
t.deepEqual(res, [3, 4])
cb()
}),
cb => t.end()
])
})
function makeSimpleView () {
let res = []
const view = {
map (msgs, next) {
res = res.concat(msgs)
next()
},
api: {
collect (kappa, cb) {
this.ready(() => cb(null, res))
}
}
}
return view
}
function makeSimpleSource (opts = {}) {
const buf = []
const maxBatch = opts.maxBatch || 10
let flow = null
let state = 0
const source = {
open (cb, _flow) {
flow = _flow
cb()
},
pull (next) {
const end = Math.min(state + maxBatch, buf.length)
const messages = buf.slice(state, end)
next({
messages,
pending: end < buf.length,
onindexed (cb) {
state = end
cb()
}
})
},
get api () {
return {
push (kappa, value) {
buf.push(value)
if (flow) flow.update()
}
}
}
}
return source
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment