Skip to content

Instantly share code, notes, and snippets.

@Frando
Last active October 25, 2019 08:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Frando/fc029021cc0aa69a1b147fb28fb3a4ea to your computer and use it in GitHub Desktop.
Save Frando/fc029021cc0aa69a1b147fb28fb3a4ea to your computer and use it in GitHub Desktop.
const tape = require('tape')
const hypercore = require('hypercore')
const ram = require('random-access-memory')
const { runAll, replicate } = require('./lib/util')
const { Kappa } = require('..')
const createIndexer = require('../sources/hypercore-sparse')
const topics = ['red', 'blue', 'green', 'yellow']
function initSparse (kappa, feed) {
kappa.use('topics', createTopicView())
kappa.source('sparsefeed' , createIndexer, {
feed,
onquery (topic, cb) {
kappa.api.topics.query(topic, cb)
}
})
}
function createTopicView () {
const topics = {}
const view = {
map (msgs, next) {
for (const msg of msgs) {
if (msg.value && msg.value.topic) {
const topic = msg.value.topic
topics[topic] = topics[topic] || []
topics[topic].push({ seq: msg.seq, key: msg.key })
}
}
next()
},
api: {
query (kappa, topic, cb) {
this.ready(() => {
if (!topics[topic]) return cb()
const feeds = topics[topic].reduce((agg, msg) => {
agg[msg.key] = agg[msg.key] || []
agg[msg.key].push(msg.seq)
return agg
}, {})
cb(null, feeds)
})
},
all (kappa, cb) {
this.ready(() => cb(null, topics))
},
requestTopic (kappa, topic) {
const flows = kappa.flowsByView(this.name)
for (let flow of flows) {
if (flow.source.requestTopic) {
flow.source.requestTopic(topic)
}
}
}
}
}
return view
}
tape.only('sparse', async t => {
// const cores = { local: null, remote: null }
var local, remote
var kloc, krem
var remotereq
const opts = {
valueEncoding: 'json',
sparse: true
}
await runAll([
cb => {
local = hypercore(ram, opts)
local.ready(cb)
},
cb => {
remote = hypercore(ram, local.key, opts)
remote.ready(cb)
},
cb => {
kloc = new Kappa()
krem = new Kappa()
initSparse(kloc, local)
initSparse(krem, remote)
cb()
},
cb => {
const batch = []
for (let i = 0; i <= 100; i++) {
let topic = topics[i % 4]
batch[i] = { topic, i }
}
local.append(batch, cb)
},
cb => replicate(local, remote, cb),
cb => logAll(cb, 'First replication done'),
cb => {
krem.api.topics.requestTopic('red')
setImmediate(cb)
},
cb => logAll(cb, 'Queried for "red"'),
cb => {
local.append([
{ topic: 'red' },
{ topic: 'blue' },
{ topic: 'red' }
], cb)
},
cb => setImmediate(cb),
cb => logAll(cb, 'Appended'),
cb => {
cb()
}
])
t.end()
async function logAll (cb, msg) {
console.log(`\n${msg}\n${'-'.repeat(msg.length)}\n`)
await runAll([
cb => log('local', local, kloc, cb),
cb => log('remote', remote, krem, cb)
])
cb()
}
})
function log (name, feed, kappa, cb) {
kappa.api.topics.all((err, res) => {
console.log(` ${name}
Key: ${shortkey(feed)}
Length: ${feed.length}
Downloaded: ${feed.downloaded()}
Topics: ${logTopics(res)}`)
cb()
})
function logTopics (topics) {
let str = ''
for (let [key, value] of Object.entries(topics)) {
str += `${key}: ${value.length} `
}
return str
}
}
function shortkey (feed) {
feed._label = feed.key.toString('hex').substring(0, 6) + '..'
return feed._label
}
function pad (str, i) {
if (str.length < i) str = str + ' '.repeat(i - str.length)
return str
}
After: first replication
----------------------
local
Key: 49b93d..
Length: 101
Downloaded: 101
Topics: red: 26 blue: 25 green: 25 yellow: 25
remote
Key: 49b93d..
Length: 101
Downloaded: 1
Topics: red: 1
After: Remote issued query "red"
-----------------
local
Key: 49b93d..
Length: 101
Downloaded: 101
Topics: red: 26 blue: 25 green: 25 yellow: 25
remote
Key: 49b93d..
Length: 101
Downloaded: 26
Topics: red: 26
After: Local appended blocks
--------
local
Key: 49b93d..
Length: 104
Downloaded: 104
Topics: red: 28 blue: 26 green: 25 yellow: 25
remote
Key: 49b93d..
Length: 104
Downloaded: 29
Topics: red: 28 blue: 1
const tape = require('tape')
const hypercore = require('hypercore')
const ram = require('random-access-memory')
const { runAll, replicate } = require('./lib/util')
const { Kappa } = require('..')
const createIndexer = require('../sources/hypercore-sparse')
const topics = ['red', 'blue', 'green', 'yellow']
function initSparse (kappa, feed) {
kappa.use('topics', createTopicView())
kappa.source('sparsefeed' , createIndexer, {
feed,
onquery (topic, cb) {
kappa.api.topics.query(topic, cb)
}
})
}
function createTopicView () {
const topics = {}
const view = {
map (msgs, next) {
for (const msg of msgs) {
if (msg.value && msg.value.topic) {
const topic = msg.value.topic
topics[topic] = topics[topic] || []
topics[topic].push({ seq: msg.seq, key: msg.key })
}
}
next()
},
api: {
query (kappa, topic, cb) {
this.ready(() => {
if (!topics[topic]) return cb()
const feeds = topics[topic].reduce((agg, msg) => {
agg[msg.key] = agg[msg.key] || []
agg[msg.key].push(msg.seq)
return agg
}, {})
cb(null, feeds)
})
},
all (kappa, cb) {
this.ready(() => cb(null, topics))
},
requestTopic (kappa, topic) {
const flows = kappa.flowsByView(this.name)
for (let flow of flows) {
if (flow.source.requestTopic) {
flow.source.requestTopic(topic)
}
}
}
}
}
return view
}
tape.only('sparse', async t => {
// const cores = { local: null, remote: null }
var local, remote
var kloc, krem
var remotereq
const opts = {
valueEncoding: 'json',
sparse: true
}
await runAll([
cb => {
local = hypercore(ram, opts)
local.ready(cb)
},
cb => {
remote = hypercore(ram, local.key, opts)
remote.ready(cb)
},
cb => {
kloc = new Kappa()
krem = new Kappa()
initSparse(kloc, local)
initSparse(krem, remote)
cb()
},
cb => {
const batch = []
for (let i = 0; i <= 100; i++) {
let topic = topics[i % 4]
batch[i] = { topic, i }
}
local.append(batch, cb)
},
cb => replicate(local, remote, cb),
cb => logAll(cb, 'First replication done'),
cb => {
krem.api.topics.requestTopic('red')
setImmediate(cb)
},
cb => logAll(cb, 'Queried for "red"'),
cb => {
local.append([
{ topic: 'red' },
{ topic: 'blue' },
{ topic: 'red' }
], cb)
},
cb => setImmediate(cb),
cb => logAll(cb, 'Appended'),
cb => {
cb()
}
])
t.end()
async function logAll (cb, msg) {
console.log(`\n${msg}\n${'-'.repeat(msg.length)}\n`)
await runAll([
cb => log('local', local, kloc, cb),
cb => log('remote', remote, krem, cb)
])
cb()
}
})
function log (name, feed, kappa, cb) {
kappa.api.topics.all((err, res) => {
console.log(` ${name}
Key: ${shortkey(feed)}
Length: ${feed.length}
Downloaded: ${feed.downloaded()}
Topics: ${logTopics(res)}`)
cb()
})
function logTopics (topics) {
let str = ''
for (let [key, value] of Object.entries(topics)) {
str += `${key}: ${value.length} `
}
return str
}
}
function shortkey (feed) {
feed._label = feed.key.toString('hex').substring(0, 6) + '..'
return feed._label
}
function pad (str, i) {
if (str.length < i) str = str + ' '.repeat(i - str.length)
return str
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment