Skip to content

Instantly share code, notes, and snippets.

@jedi4ever
Last active October 1, 2021 13:39
Show Gist options
  • Save jedi4ever/6591925 to your computer and use it in GitHub Desktop.
Save jedi4ever/6591925 to your computer and use it in GitHub Desktop.
xsub , xpub example in nodejs
var pubListener = 'tcp://127.0.0.1:5555';
var subListener = 'tcp://127.0.0.1:5556';
var hwm = 1000;
var verbose = 0;
// The xsub listener is where pubs connect to
var subSock = zmq.socket('xsub');
subSock.identity = 'subscriber' + process.pid;
subSock.bindSync(subListener);
// The xpub listener is where subs connect to
var pubSock = zmq.socket('xpub');
pubSock.identity = 'publisher' + process.pid;
pubSock.setsockopt(zmq.ZMQ_SNDHWM, hwm);
// By default xpub only signals new subscriptions
// Settings it to verbose = 1 , will signal on every new subscribe
pubSock.setsockopt(zmq.ZMQ_XPUB_VERBOSE, verbose);
pubSock.bindSync(pubListener);
// When we receive data on subSock , it means someone is publishing
subSock.on('message', function(data) {
// We just relay it to the pubSock, so subscribers can receive it
pubSock.send(data);
});
// When Pubsock receives a message , it's subscribe requests
pubSock.on('message', function(data, bla) {
// The data is a slow Buffer
// The first byte is the subscribe (1) /unsubscribe flag (0)
var type = data[0]===0 ? 'unsubscribe' : 'subscribe';
// The channel name is the rest of the buffer
var channel = data.slice(1).toString();
console.log(type + ':' + channel);
// We send it to subSock, so it knows to what channels to listen to
subSock.send(data);
});
// Beware if pub is started before a subcriber listening, it just goes into /dev/null space
var zmq = require('zmq');
var sock = zmq.socket('pub');
// Instead of binding our pub socket, we connect to the PUB -> XSUB
sock.connect('tcp://127.0.0.1:5556');
sock.send('MYCHANNEL ' + 'here is the info');
var zmq = require('zmq');
var sock = zmq.socket('sub');
// We subscribe to MYCHANNEL only
sock.subscribe('MYCHANNEL');
// We connect our sub -> XPUB channel
sock.connect('tcp://127.0.0.1:5555');
// When a message comes,
sock.on('message', function(data) {
// data is a SlowBuffer
// data contains : 'MYCHANNEL' + aspace + 'here is the info'
console.log(data.toString());
});
@darobin
Copy link

darobin commented Apr 25, 2017

For what it's worth, this will not work if multipart messages are exchanged (typically done with sock.send([array, of, stuff]) in Node). See JustinTulloss/zeromq.node#412 for more details. The problem is that when using that, send() expects an array but on('message') receives a split out list of arguments. That is to say, instead of:

subSock.on('message', function(data) {
  // We just relay it to the pubSock, so subscribers can receive it
 pubSock.send(data);
});

you need

subSock.on('message', (...args) => pubSock.send(args));

and conversely on the other socket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment