Skip to content

Instantly share code, notes, and snippets.

@lejeunerenard
Last active May 26, 2023 22:02
Show Gist options
  • Save lejeunerenard/8d2159ab79c1054e58faaaa4d761ca09 to your computer and use it in GitHub Desktop.
Save lejeunerenard/8d2159ab79c1054e58faaaa4d761ca09 to your computer and use it in GitHub Desktop.
Hyperbee `db.watch()` example w/ different replications
import Hyperbee from 'hyperbee'
import Corestore from 'corestore'
import RAM from 'random-access-memory'
const writerStore = new Corestore(RAM)
const writerCore = await writerStore.get({ name: 'hyperbee-core' })
await writerCore.ready()
const watchAllStore = new Corestore(RAM)
const watchAllCore = await watchAllStore.get(writerCore.key)
const watchPeriodicallyStore = new Corestore(RAM)
const watchPeriodicallyCore = await watchPeriodicallyStore.get(writerCore.key)
const watchRangeStore = new Corestore(RAM)
const watchRangeCore = await watchRangeStore.get(writerCore.key)
// Ready the rest
await Promise.all([watchAllCore, watchPeriodicallyCore, watchRangeCore].map((c) => c.ready()))
// Hyperbee setup
const hyperbeeOpts = {
keyEncoding: 'utf8',
valueEncoding: 'json'
}
const writer = new Hyperbee(writerCore, hyperbeeOpts)
const watchAll = new Hyperbee(watchAllCore, hyperbeeOpts)
const watchPeriodically = new Hyperbee(watchPeriodicallyCore, hyperbeeOpts)
const watchRange = new Hyperbee(watchRangeCore, hyperbeeOpts)
const watchAllWatcher = watchAll.watch()
watchAllWatcher.on('update', async () => {
console.log('watchAll update\n prev version', watchAllWatcher.previous.version, 'current version', watchAllWatcher.current.version, 'watchAllCore contiguousLength', (await watchAllCore.info()).contiguousLength)
console.log('numberOfBlocks(watchAllCore)', await numberOfBlocks(watchAllCore))
})
const watchPeriodicallyWatcher = watchPeriodically.watch()
watchPeriodicallyWatcher.on('update', async () => {
console.log('watchPeriodically update\n prev version', watchPeriodicallyWatcher.previous.version, 'current version', watchPeriodicallyWatcher.current.version, 'watchPeriodicallyCore contiguousLength', (await watchPeriodicallyCore.info()).contiguousLength)
console.log('numberOfBlocks(watchPeriodicallyCore)', await numberOfBlocks(watchPeriodicallyCore))
})
const watchRangeWatcher = watchRange.watch({ gte: 'boop', lt: 'boop0' })
watchRangeWatcher.on('update', async () => {
console.log('watchRange update\n prev version', watchRangeWatcher.previous.version, 'current version', watchRangeWatcher.current.version, 'watchRangeCore contiguousLength', (await watchRangeCore.info()).contiguousLength)
console.log('numberOfBlocks(watchRangeCore)', await numberOfBlocks(watchRangeCore))
})
// Watch all replication
replicate(writerStore, watchAllStore)
replicate(writerStore, watchRangeStore)
// Append events
console.log('put 0', writer.version)
await writer.put('beep', 1)
console.log('put 1', writer.version)
await asyncTimeout(1)
await writer.put('boop', 1)
console.log('put 2', writer.version)
await asyncTimeout(1)
await writer.put('beep', 2)
console.log('put 3', writer.version)
await asyncTimeout(1)
await writer.put('beep', 3)
console.log('put 4', writer.version)
await asyncTimeout(1)
// Watch Periodically replication
replicate(writerStore, watchPeriodicallyStore)
await asyncTimeout(1)
console.log('end numberOfBlocks(watchAllCore)', await numberOfBlocks(watchAllCore))
console.log('end numberOfBlocks(watchPeriodicallyCore)', await numberOfBlocks(watchPeriodicallyCore))
console.log('end numberOfBlocks(watchRangeCore)', await numberOfBlocks(watchRangeCore))
function replicate (a, b, opts) {
const s1 = a.replicate(true, { keepAlive: false, ...opts })
const s2 = b.replicate(false, { keepAlive: false, ...opts })
s1.on('error', err => console.error(`replication stream error (initiator): ${err}`))
s2.on('error', err => console.error(`replication stream error (responder): ${err}`))
s1.pipe(s2).pipe(s1)
return [s1, s2]
}
function asyncTimeout (seconds) {
return new Promise(setTimeout, seconds * 1000)
}
async function numberOfBlocks (core) {
let totalBlocks = 0
for (let i = 0; i < core.length; i++) {
if (await core.has(i)) {
totalBlocks++
}
}
return totalBlocks
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment