Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@jedi4ever
Last active October 1, 2021 13:39
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 8 You must be signed in to fork a gist
  • Save jedi4ever/6591925 to your computer and use it in GitHub Desktop.
Save jedi4ever/6591925 to your computer and use it in GitHub Desktop.
xsub , xpub example in nodejs
var pubListener = 'tcp://127.0.0.1:5555';
var subListener = 'tcp://127.0.0.1:5556';
var hwm = 1000;
var verbose = 0;
// The xsub listener is where pubs connect to
var subSock = zmq.socket('xsub');
subSock.identity = 'subscriber' + process.pid;
subSock.bindSync(subListener);
// The xpub listener is where subs connect to
var pubSock = zmq.socket('xpub');
pubSock.identity = 'publisher' + process.pid;
pubSock.setsockopt(zmq.ZMQ_SNDHWM, hwm);
// By default xpub only signals new subscriptions
// Settings it to verbose = 1 , will signal on every new subscribe
pubSock.setsockopt(zmq.ZMQ_XPUB_VERBOSE, verbose);
pubSock.bindSync(pubListener);
// When we receive data on subSock , it means someone is publishing
subSock.on('message', function(data) {
// We just relay it to the pubSock, so subscribers can receive it
pubSock.send(data);
});
// When Pubsock receives a message , it's subscribe requests
pubSock.on('message', function(data, bla) {
// The data is a slow Buffer
// The first byte is the subscribe (1) /unsubscribe flag (0)
var type = data[0]===0 ? 'unsubscribe' : 'subscribe';
// The channel name is the rest of the buffer
var channel = data.slice(1).toString();
console.log(type + ':' + channel);
// We send it to subSock, so it knows to what channels to listen to
subSock.send(data);
});
// Beware if pub is started before a subcriber listening, it just goes into /dev/null space
var zmq = require('zmq');
var sock = zmq.socket('pub');
// Instead of binding our pub socket, we connect to the PUB -> XSUB
sock.connect('tcp://127.0.0.1:5556');
sock.send('MYCHANNEL ' + 'here is the info');
var zmq = require('zmq');
var sock = zmq.socket('sub');
// We subscribe to MYCHANNEL only
sock.subscribe('MYCHANNEL');
// We connect our sub -> XPUB channel
sock.connect('tcp://127.0.0.1:5555');
// When a message comes,
sock.on('message', function(data) {
// data is a SlowBuffer
// data contains : 'MYCHANNEL' + aspace + 'here is the info'
console.log(data.toString());
});
@srigi
Copy link

srigi commented Aug 11, 2014

Thank you very much, you saved my day. It is shameful that XPUB/XSUB example is missing in https://github.com/imatix/zguide/tree/master/examples/
When there will be some time, I will probably send PR to them.

@machinekoder
Copy link

Thanks! I have a few questions to your design:

You set the identity on the xsub socket and forward messages received on xpub directly to the xsub socket. Does that mean that ZMQ exposes subscriptions also on the xsub socket? How does it behave if you have multiple cliens subscribing and unsubscribing, does ZMQ use the identity to keep track of wheter topic messages should be sent or not? Because if this would not be the case a client could unsubscribe other clients connected to the broker.

@darobin
Copy link

darobin commented Apr 25, 2017

For what it's worth, this will not work if multipart messages are exchanged (typically done with sock.send([array, of, stuff]) in Node). See JustinTulloss/zeromq.node#412 for more details. The problem is that when using that, send() expects an array but on('message') receives a split out list of arguments. That is to say, instead of:

subSock.on('message', function(data) {
  // We just relay it to the pubSock, so subscribers can receive it
 pubSock.send(data);
});

you need

subSock.on('message', (...args) => pubSock.send(args));

and conversely on the other socket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment