Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save Nuhvi/04f7b57aeaeaf933b6ea6fd7925f613b to your computer and use it in GitHub Desktop.
Save Nuhvi/04f7b57aeaeaf933b6ea6fd7925f613b to your computer and use it in GitHub Desktop.
successful corestore.replicate + protomuxRPC
import Hyperswarm from 'hyperswarm'
import Corestore from 'corestore'
import ProtomuxRPC from 'protomux-rpc'
import Protomux from 'protomux'
import RAM from 'random-access-memory'
import createTestnet from '@hyperswarm/testnet'
import Hypercore from 'hypercore'
const testnet = await createTestnet()
const swarm_A = new Hyperswarm(testnet)
const store_A = new Corestore(RAM)
store_A.replicate = customReplicate.bind(store_A)
const core_A = store_A.get({ name: 'foo' })
await core_A.ready()
await core_A.append(['foo'])
swarm_A.on('connection', stream => {
stream = Protomux.from(stream)
console.log('swarm A connection callback')
store_A.replicate(stream)
const rpc = new ProtomuxRPC(stream)
rpc.respond('echo', req => req)
})
await swarm_A.listen()
const swarm_B = new Hyperswarm(testnet)
const store_B = new Corestore(RAM)
const core_B = store_B.get({ key: core_A.key })
core_B.ready()
store_B.replicate = customReplicate.bind(store_B)
swarm_B.on('connection', async stream => {
stream = Protomux.from(stream)
console.log('swarm B connection callback')
store_B.replicate(stream)
core_B.update().then(() => {
console.log('result:', core_B.length)
})
const rpc = new ProtomuxRPC(stream)
const response = await rpc.request('echo', Buffer.from('foo'))
console.log({ response })
})
swarm_B.joinPeer(swarm_A.keyPair.publicKey)
function customReplicate (isInitiator, opts = {}) {
const isExternal = isStream(isInitiator) || !!(opts && opts.stream)
const mux = Protomux.isProtomux(isInitiator) && isInitiator
const stream =
(mux && mux.stream) ||
Hypercore.createProtocolStream(isInitiator, {
...opts,
ondiscoverykey: discoveryKey => {
const core = this.get({ _discoveryKey: discoveryKey })
return core.ready().catch(safetyCatch)
}
})
const sessions = []
for (const core of this.cores.values()) {
if (!core.opened) continue // If the core is not opened, it will be replicated in preload.
const session = core.session()
sessions.push(session)
core.replicate(mux || stream)
}
const streamRecord = { stream, isExternal }
this._replicationStreams.push(streamRecord)
this._streamSessions.set(stream, sessions)
stream.once('close', () => {
this._replicationStreams.splice(
this._replicationStreams.indexOf(streamRecord),
1
)
this._streamSessions.delete(stream)
Promise.all(sessions.map(s => s.close())).catch(safetyCatch)
})
return stream
}
function isStream (s) {
return typeof s === 'object' && s && typeof s.pipe === 'function'
}
function safetyCatch () {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment