Skip to content

Instantly share code, notes, and snippets.

@haadcode
Last active November 24, 2016 06:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save haadcode/52948777e9f2f69cd01bdcd9634ee324 to your computer and use it in GitHub Desktop.
Save haadcode/52948777e9f2f69cd01bdcd9634ee324 to your computer and use it in GitHub Desktop.
'use strict'
const PubsubMessageStream = require('ipfs-pubsub-message-stream')
class Pubsub {
constructor (send: Function) {
this._subscriptions = {}
this._send = send
}
subscribe (topic: string, options: Object = {}) {
// We don't allow multiple subscriptions at the moment
if (this._subscriptions[topic])
throw new Error(`Already subscribed to '${topic}'`)
// Command
const command = {
path: 'pubsub/sub',
args: [topic],
options: Object.assign(options, { discover: false })
}
// Start the request
const {res, req} = await this._send(command)
// Convert the response from HTTP to Pubsub messages
let stream = res.pipe(new PubsubMessageStream())
// Add the request to the active subscriptions
return this._addSubscription(topic, req, stream)
}
publish (topic: string, data: Buffer) {
return await this._send({
path: 'pubsub/pub',
args: [topic, buf]
})
}
ls () {
const res = await this._send({path: 'pubsub/ls'})
return res.Strings || []
}
peers (topic: string) {
if (!this._subscriptions[topic])
throw new Error(`Not subscribed to '${topic}'`)
const res = await this._send({
path: 'pubsub/peers',
args: [topic]
})
return res.Strings || []
}
_addSubscription (topic: string, connection: IncomingMessage, output: PubsubMessageStream) {
// Add a cancel method so that the subscription can be cleanly cancelled
output.cancel = () => this._removeSubscription(topic)
this._subscriptions[topic] = { connection: connection, output: output }
return output
}
_removeSubscription (topic: string) {
if (!this._subscriptions[topic])
throw new Error(`Not subscribed to ${topic}`)
subscriptions[topic].connection.abort()
subscriptions[topic].output.end()
delete subscriptions[topic]
})
}
module.exports = Pubsub
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment