Skip to content

Instantly share code, notes, and snippets.

@ceremcem
Forked from jedi4ever/broker.js
Last active August 29, 2015 14:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ceremcem/2f7db58f0f62d91e4fb2 to your computer and use it in GitHub Desktop.
Save ceremcem/2f7db58f0f62d91e4fb2 to your computer and use it in GitHub Desktop.
xsub , xpub example in nodejs
var pubListener = 'tcp://127.0.0.1:5555';
var subListener = 'tcp://127.0.0.1:5556';
var hwm = 1000;
var verbose = 0;
// The xsub listener is where pubs connect to
var subSock = zmq.socket('xsub');
subSock.identity = 'subscriber' + process.pid;
subSock.bindSync(subListener);
// The xpub listener is where subs connect to
var pubSock = zmq.socket('xpub');
pubSock.identity = 'publisher' + process.pid;
pubSock.setsockopt(zmq.ZMQ_SNDHWM, hwm);
// By default xpub only signals new subscriptions
// Settings it to verbose = 1 , will signal on every new subscribe
pubSock.setsockopt(zmq.ZMQ_XPUB_VERBOSE, verbose);
pubSock.bindSync(pubListener);
// When we receive data on subSock , it means someone is publishing
subSock.on('message', function(data) {
// We just relay it to the pubSock, so subscribers can receive it
pubSock.send(data);
});
// When Pubsock receives a message , it's subscribe requests
pubSock.on('message', function(data, bla) {
// The data is a slow Buffer
// The first byte is the subscribe (1) /unsubscribe flag (0)
var type = data[0]===0 ? 'unsubscribe' : 'subscribe';
// The channel name is the rest of the buffer
var channel = data.slice(1).toString();
console.log(type + ':' + channel);
// We send it to subSock, so it knows to what channels to listen to
subSock.send(data);
});
// Beware if pub is started before a subcriber listening, it just goes into /dev/null space
var zmq = require('zmq');
var sock = zmq.socket('pub');
// Instead of binding our pub socket, we connect to the PUB -> XSUB
sock.connect('tcp://127.0.0.1:5556');
sock.send('MYCHANNEL ' + 'here is the info');
var zmq = require('zmq');
var sock = zmq.socket('sub');
// We subscribe to MYCHANNEL only
sock.subscribe('MYCHANNEL');
// We connect our sub -> XPUB channel
sock.connect('tcp://127.0.0.1:5555');
// When a message comes,
sock.on('message', function(data) {
// data is a SlowBuffer
// data contains : 'MYCHANNEL' + aspace + 'here is the info'
console.log(data.toString());
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment