Skip to content

Instantly share code, notes, and snippets.

@vasco-santos
Last active October 15, 2019 10:43
Show Gist options
  • Save vasco-santos/2c39ab16d15a87c573bef4f916472831 to your computer and use it in GitHub Desktop.
Save vasco-santos/2c39ab16d15a87c573bef4f916472831 to your computer and use it in GitHub Desktop.
Pubsub base protocol proposal for libp2p refactor
'use strict'
const assert = require('assert')
const debug = require('debug')
const EventEmitter = require('events')
const errcode = require('err-code')
const PeerInfo = require('peer-info')
const message = require('./message')
const Peer = require('./peer')
const utils = require('./utils')
/**
* PubsubBaseProtocol handles the peers and connections logic for pubsub routers
*/
class PubsubBaseProtocol extends EventEmitter {
/**
* @param {String} debugName
* @param {String} multicodec
* @param {PeerInfo} peerInfo
* @param {Object} registration
* @param {function} registration.register
* @param {function} registration.unregister
* @param {Object} [options]
* @param {boolean} [options.signMessages] if messages should be signed, defaults to true
* @param {boolean} [options.strictSigning] if message signing should be required, defaults to true
* @constructor
*/
constructor (debugName, multicodec, peerInfo, registration, options = {}) {
assert(debugName && typeof debugName === 'string', 'a debugname `string` is required')
assert(multicodec && typeof multicodec === 'string', 'a multicodec `string` is required')
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`')
// Registration handling
assert(registration && typeof registration.register === 'function', 'a register function must be provided in registration')
assert(registration && typeof registration.unregister === 'function', 'a unregister function must be provided in registration')
super()
options = {
signMessages: true,
strictSigning: true,
...options
}
this.log = debug(debugName)
this.log.err = debug(`${debugName}:error`)
this.multicodec = multicodec
this.peerInfo = peerInfo
this.registration = registration
this.started = false
if (options.signMessages) {
this.peerId = this.peerInfo.id
}
/**
* Map of topics to which peers are subscribed to
*
* @type {Map<string, Peer>}
*/
this.topics = new Map()
/**
* Map of peers.
*
* @type {Map<string, Peer>}
*/
this.peers = new Map()
}
/**
* Mounts the pubsub protocol onto the libp2p node and sends our
* subscriptions to every peer conneceted
* @returns {Promise}
*/
async start () {
if (this.started) {
return
}
this.log('starting')
// register protocol with multicodec and handlers
await this.registration.register(this.multicodec, {
onConnect: this._onPeerPeerConnected,
onDisconnect: this._onPeerDisconnected
})
this.log('started')
this.started = true
}
/**
* Unmounts the pubsub protocol and shuts down every connection
* @returns {void}
*/
stop () {
if (!this.started) {
throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET')
}
// unregister protocol and handlers
await this.registration.unregister(this.multicodec)
this.log('stopping')
this.peers.forEach((peer) => peer.close())
this.peers = new Map()
this.started = false
this.log('stopped')
}
/**
* Peer connected successfully with pubsub protocol.
* @private
* @param {PeerInfo} peerInfo peer info
* @param {Connection} conn connection to the peer
*/
_onPeerConnected (peerInfo, conn) {
const idB58Str = peerInfo.id.toB58String()
this.log('connected', idB58Str)
const peer = this._addPeer(new Peer(peerInfo))
peer.attachConnection(conn)
this._processMessages(idB58Str, conn, peer)
}
/**
* Peer disconnected.
* @private
* @param {PeerInfo} peerInfo peer info
* @param {Error} err error for connection end
*/
_onPeerDisconnected (peerInfo, err) {
const idB58Str = peerInfo.id.toB58String()
const peer = this.peers.get(idB58Str)
this.log('connection ended', idB58Str, err ? err.message : '')
this._removePeer(peer)
}
/**
* Add a new connected peer to the peers map.
* @private
* @param {PeerInfo} peer peer info
* @returns {PeerInfo}
*/
_addPeer (peer) {
const id = peer.info.id.toB58String()
/*
Always use an existing peer.
What is happening here is: "If the other peer has already dialed to me, we already have
an establish link between the two, what might be missing is a
Connection specifically between me and that Peer"
*/
let existing = this.peers.get(id)
if (!existing) {
this.log('new peer', id)
this.peers.set(id, peer)
existing = peer
peer.once('close', () => this._removePeer(peer))
}
++existing._references
return existing
}
/**
* Remove a peer from the peers map if it has no references.
* @private
* @param {Peer} peer peer state
* @returns {PeerInfo}
*/
_removePeer (peer) {
const id = peer.info.id.toB58String()
this.log('remove', id, peer._references)
// Only delete when no one else is referencing this peer.
if (--peer._references === 0) {
this.log('delete peer', id)
this.peers.delete(id)
}
return peer
}
/**
* Get a list of the peer-ids that are subscribed to one topic.
* @param {string} topic
* @returns {Array<string>}
*/
getPeersSubscribed (topic) {
if (!topic || typeof topic !== 'string') {
throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC')
}
return Array.from(peer.values())
.filter((peer) => peer.topics.has(topic))
.map((peer) => peer.info.id.toB58String())
}
/**
* Validates the given message. The signature will be checked for authenticity.
* @param {rpc.RPC.Message} message
* @returns {Promise<Boolean>}
*/
async validate (message) { // eslint-disable-line require-await
// If strict signing is on and we have no signature, abort
if (this.strictSigning && !message.signature) {
this.log('Signing required and no signature was present, dropping message:', message)
return Promise.resolve(false)
}
// Check the message signature if present
if (message.signature) {
return verifySignature(message)
} else {
return Promise.resolve(true)
}
}
/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
* @abstract
* @param {Array<string>|string} topics
* @param {Array<any>|any} messages
* @returns {undefined}
*
*/
publish (topics, messages) {
throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}
/**
* Overriding the implementation of subscribe should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply send a message for every peer showing interest in the topics
* @abstract
* @param {Array<string>|string} topics
* @returns {undefined}
*/
subscribe (topics) {
throw errcode('subscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
// TODO: add to docs that implementations must call addSubscription()
}
/**
* Overriding the implementation of unsubscribe should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply send a message for every peer revoking interest in the topics
* @abstract
* @param {Array<string>|string} topics
* @returns {undefined}
*/
unsubscribe (topics) {
throw errcode('unsubscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}
/**
* Overriding the implementation of getTopics should handle the appropriate algorithms for the publish/subscriber implementation.
* Get the list of subscriptions the peer is subscribed to.
* @abstract
* @returns {Array<string>}
*/
getTopics () {
throw errcode('getTopics must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}
/**
* Overriding the implementation of _processMessages should keep the connection and is
* responsible for processing each RPC message received by other peers.
* @abstract
* @param {string} idB58Str peer id string in base58
* @param {Connection} conn connection
* @param {PeerInfo} peer peer info
* @returns {undefined}
*
*/
_processMessages (idB58Str, conn, peer) {
throw errcode('_processMessages must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}
}
module.exports = PubsubBaseProtocol
module.exports.message = message
module.exports.utils = utils
@jacobheun
Copy link

As we go through this I'd like to think about registration in terms of Topology requests, libp2p/notes#13. I think registration could become registrar and that would end up being the TopologyRegistrar the Connection Manager and libp2p uses. It could be very basic for now, but we could eventually evolve that to allow for more complex Topology registration/interactions.

// Give me between 10 and 20 connections on the protocol
registrar.request(protocol, { min: 10, max: 20 })
//  Give me between 10% and 20% of connections (strings could be interpreted as percentages)
registrar.request(protocol, { min: '10', max: '20' })

Although it may be better to pass a Topology object to the request, and we could create a basic Topology to start with that just tries to maintain a range of connections on a given protocol.

@vasco-santos
Copy link
Author

Cool, I will consider this to the PR!

@vasco-santos
Copy link
Author

vasco-santos commented Oct 15, 2019

Hey 👋

Regarding your comment with the registrar, my previous idea was:

// register protocol with multicodec and handlers
await this.registration.register(this.multicodec, {
   onConnect: this._onPeerPeerConnected,
   onDisconnect: this._onPeerDisconnected
})

// unregister before stop
await this.registration.unregister(this.multicodec)

With your comment:

registrar.request(protocol, { min: 10, max: 20 })

I am iterating as:

const handlers = {
  onConnect: this._onPeerPeerConnected,
  onDisconnect: this._onPeerDisconnected
}

const topology = { min: 10, max: 20 }

// Subsystem start
await registrar.bind(this.multicodec, handlers, topology)

// Subsystem stop
await registrar.unbind(this.multicodec)

I didn't find request a good naming because we need to have an opposite action, and we do a "request" we usually provide an action associated (example HTTP REQ)

Other solution for the handlers, was the registrar to be an EventEmitter, I am not sure what I prefer though

@jacobheun
Copy link

I would go with .register and .unregister. The main thought behind request was that a system was asking for a certain amount of resources, but they're not necessarily going to get them if they ask for an invalid amount, but I think we can potentially throw an error on register about asking for an invalid Topology.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment