Skip to content

Instantly share code, notes, and snippets.

@Frando
Created November 18, 2019 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Frando/85a6735e99e037ec28eddf82969daa0c to your computer and use it in GitHub Desktop.
Save Frando/85a6735e99e037ec28eddf82969daa0c to your computer and use it in GitHub Desktop.
// This is an example for a sparse source for kappa5 pubsub
const Bitfield = require('bitfield-db')
const nanoiterator = require('nanoiterator')
const ram = require('random-access-memory')
module.exports = function createIndexer (handlers, opts) {
return new SparseIndexer(handlers, opts)
}
class SparseIndexer {
constructor (handlers, opts) {
this.handlers = handlers
this.feed = opts.feed
this.indexed = new Bitfield(ram())
this.live = typeof opts.live === 'undefined' ? true : opts.live
if (!opts.onquery) throw new Error('onquery opt is required.')
this._onquery = opts.onquery
// Register query extension.
this._registerExtension()
}
subscribe (topic) {
this._ext.broadcast({
type: 'query',
topic
})
}
open (done) {
this.feed.ready(() => {
this.id = this.feed.key.toString('hex')
// Attach update handlers.
this.feed.on('append', this.handlers.onupdate)
this.feed.on('download', this.handlers.onupdate)
if (this.live) {
// This is live mode in sparse mode.
this.feed.once('remote-update', peer => {
this.feed.download({ start: peer.remoteLength })
})
}
// Download latest block so we know length.
this.feed.update(done)
})
}
pull (state, done) {
const self = this
const feed = this.feed
const has = this.feed.bitfield
const indexed = this.indexed
const ite = nanoiterator({ next: iterate })
let i = -1
collect(ite, 20, (seqs) => {
if (seqs.length) index(seqs)
else done()
})
// This iterates over the full bitfield of the hypercore
// (starting at 0) and for each 1 (= available seq) it
// checks the indexed bitfield if this seq was already
// indexed. It does this, starting at 0, everytime
// the pull function is invoked. This is of course
// quite inefficient, especially for long feeds that
// are mostly indexed. However I did not yet find
// a way to make this more efficient without
// recreating the hypercore bitfield. Maybe the latter
// is what's needed, but not sure yet. Best would be
// if there'd be a fast way to XOR two bitfields.
function iterate (cb) {
let have = false
while (have === false) {
i++
if (i === feed.length) return cb(null)
have = has.get(i)
}
self.indexed.has(i, (err, exists) => {
if (!exists) cb(i)
else iterate(cb)
})
}
function index (seqs, next) {
// Sort seqs into consecutive batches because
// hypercore.getBatch is more efficient.
const batches = intoBatches(seqs)
const opts = { wait: false }
// Now get all batches and combine the results.
let pending = batches.length
let results = []
batches.forEach(batch => {
const { start, end } = batch
feed.getBatch(start, end, opts, (err, res) => onbatch(err, res, start))
})
function onbatch (err, res, start) {
res = res.map((value, i) => {
return {
key: feed.key.toString('hex'),
seq: start + i,
value
}
})
results = results.concat(res)
if (--pending === 0) finish(null, results, seqs)
}
}
// Now add the indexed seqs to our indexed bitfield
// and pass on the messages.
function finish (err, results, seqs) {
for (const seq of seqs) {
indexed.add(seq)
}
indexed.flush(() => {
indexed.rank(feed.length, (err, count) => {
let workLeft = count < has.total()
done(null, results, workLeft)
})
})
}
}
_registerExtension (queryCallback) {
const name = '__sparse-indexer-query'
this._ext = this.feed.registerExtension(name, {
encoding: 'json',
onmessage: this._onExtensionMessage.bind(this)
})
}
_onExtensionMessage (message, peer) {
if (message.type === 'query') {
this._onquery(message.topic, (err, feeds) => {
if (err) return
const res = {
type: 'response',
topic: message.topic,
feeds
}
this._ext.send(res, peer)
})
}
if (message.type === 'response') {
let { feeds } = message
// TODO: This is where the "multifeed" part would come in.
// Add missing feeds etc.
if (feeds[this.id]) {
let missingSeqs = feeds[this.id].filter(seq => !this.feed.has(seq))
const batches = intoBatches(missingSeqs)
batches.forEach(({ start, end }) => {
this.feed.download({ start, end })
})
}
}
}
}
function collect (ite, max, cb) {
let buf = []
ite.next(onnext)
function onnext (seq) {
if (seq === null) return cb(buf)
else buf.push(seq)
if (buf.length < max) ite.next(onnext)
else cb(buf)
}
}
function intoBatches (seqs) {
const batches = []
let start = seqs[0]
let end = start + 1
for (let i = 0; i < seqs.length; i++) {
let seq = seqs[i]
if (seq > end) {
batches.push({ start, end })
start = seq
}
end = seq + 1
}
batches.push({ start, end })
return batches
}
❯ npm run test
> kappa-core@5.0.0-alpha1 test /home/bit/Code/node/kappa-core
> tape test/*.js
TAP version 13
# sparse
First replication done
----------------------
local
Key: 9182f3..
Writable: true
Length: 101
Downloaded: 101
Topics: red: 26 blue: 25 green: 25 yellow: 25
remote
Key: 9182f3..
Writable: false
Length: 101
Downloaded: 0
Topics:
Queried for "red"
-----------------
local
Key: 9182f3..
Writable: true
Length: 101
Downloaded: 101
Topics: red: 26 blue: 25 green: 25 yellow: 25
remote
Key: 9182f3..
Writable: false
Length: 101
Downloaded: 26
Topics: red: 26
Appended two reds one blue
--------------------------
local
Key: 9182f3..
Writable: true
Length: 104
Downloaded: 104
Topics: red: 28 blue: 26 green: 25 yellow: 25
remote
Key: 9182f3..
Writable: false
Length: 104
Downloaded: 29
Topics: red: 28 blue: 1
1..0
# tests 0
# pass 0
# ok
const tape = require('tape')
const hypercore = require('hypercore')
const ram = require('random-access-memory')
const { runAll, replicate } = require('./lib/util')
const { Kappa } = require('kappa-core')
const createIndexer = require('./hypercore-sparse')
const topics = ['red', 'blue', 'green', 'yellow']
tape.only('sparse', async t => {
var local, remote
runAll([
// Create two feeds and a kappa for each.
cb => {
local = init()
local.feed.ready(cb)
},
cb => {
remote = init(local.feed.key)
remote.feed.ready(cb)
},
// Fill the local feed with some data.
cb => {
const batch = []
for (let i = 0; i <= 100; i++) {
batch[i] = { topic: topics[i % 4], i }
}
local.feed.append(batch, cb)
},
// Replicate the two feeds.
cb => replicate(local.feed, remote.feed, cb),
cb => logAll(cb, 'First replication done'),
// Make a query
cb => {
remote.kappa.api.topics.remoteQuery('red')
setImmediate(cb)
},
cb => logAll(cb, 'Queried for "red"'),
// Add some more data
cb => {
local.feed.append([
{ topic: 'red' },
{ topic: 'blue' },
{ topic: 'red' }
], cb)
},
cb => setImmediate(cb),
cb => logAll(cb, 'Appended two reds one blue'),
// End
cb => t.end()
])
function logAll (cb, msg) {
console.log(`\n${msg}\n${'-'.repeat(msg.length)}\n`)
runAll([
cb => log('local', local.feed, local.kappa, cb),
cb => log('remote', remote.feed, remote.kappa, cb),
() => cb()
])
}
})
function init (key) {
const feed = hypercore(ram, key, {
valueEncoding: 'json',
sparse: true
})
const kappa = new Kappa()
kappa.use('topics', createTopicView())
kappa.source('sparsefeed', createIndexer, {
feed,
onquery (topic, cb) {
kappa.api.topics.query(topic, cb)
}
})
return { feed, kappa }
}
function createTopicView () {
const topics = {}
const view = {
map (msgs, next) {
for (const msg of msgs) {
if (msg.value && msg.value.topic) {
const topic = msg.value.topic
topics[topic] = topics[topic] || []
topics[topic].push({ seq: msg.seq, key: msg.key })
}
}
next()
},
api: {
query (kappa, topic, cb) {
this.ready(() => {
if (!topics[topic]) return cb()
const feeds = topics[topic].reduce((agg, msg) => {
agg[msg.key] = agg[msg.key] || []
agg[msg.key].push(msg.seq)
return agg
}, {})
cb(null, feeds)
})
},
all (kappa, cb) {
this.ready(() => cb(null, topics))
},
remoteQuery (kappa, topic) {
const flows = kappa.flowsByView(this.name)
for (let flow of flows) {
if (flow.source.query) {
flow.source.query(topic)
}
}
}
}
}
return view
}
function log (name, feed, kappa, cb) {
kappa.api.topics.all((err, res) => {
console.log(` ${name}
Key: ${shortkey(feed)}
Writable: ${feed.writable}
Length: ${feed.length}
Downloaded: ${feed.downloaded()}
Topics: ${logTopics(res)}`)
cb()
})
function logTopics (topics) {
let str = ''
for (let [key, value] of Object.entries(topics)) {
str += `${key}: ${value.length} `
}
return str
}
}
function shortkey (feed) {
feed._label = feed.key.toString('hex').substring(0, 6) + '..'
return feed._label
}
function pad (str, i) {
if (str.length < i) str = str + ' '.repeat(i - str.length)
return str
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment