Skip to content

Instantly share code, notes, and snippets.

@boltzj
Last active January 19, 2017 13:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save boltzj/586f654358da2f56cc4d358870a5b148 to your computer and use it in GitHub Desktop.
Save boltzj/586f654358da2f56cc4d358870a5b148 to your computer and use it in GitHub Desktop.
Publisher.js
var zmq = require('zmq');
/**
* Publish Message to Subscriber
*/
var publisher = zmq.socket('pub');
publisher.bind('tcp://*:8688', function(err) {
if (err) {
return console.log(err);
}
console.log('Listening on 8688…');
// TODO: Send message to subscriber
setInterval(function loop() {
var rand = {
id: Math.ceil(Math.random() * 5)
}
publisher.send(JSON.stringify(rand));
}, 1000);
});
/**
* Handle Subscriber Responses
*/
var pull = zmq.socket('pull');
pull.bind('tcp://*:8888', function(err) {
pull.on('message', function(msg) {
// TODO: Handle Message here
console.log(msg.toString());
})
});
process.on('SIGINT', function() {
publisher.close()
pull.close()
process.exit()
});
import json
import sys
import random
import time
import zmq
sub_port = "8688"
push_port = "8888"
# Init context
context = zmq.Context()
# Subscriber socket
sub = context.socket(zmq.SUB)
# Socket to respond to publisher
push = zmq.Context().socket(zmq.PUSH)
# Connect to Subscriber and it's push respond
sub.connect("tcp://localhost:%s" % sub_port)
push.connect("tcp://localhost:%s" % push_port)
rand = random.randrange(1, 5)
print('Random id: %s' % rand)
# topicfilter = "%datasetId%"
sub.setsockopt(zmq.SUBSCRIBE, b'')
ttl = time.time() + 20
while 42:
try:
# Read from publisher socket
buf = sub.recv(zmq.DONTWAIT).decode('utf-8')
# Parse receipt message
msg = json.loads(buf)
# Message Processing
if msg["id"] == rand:
print('Message Id %s! Update TTL' % rand)
# Send response to Publisher
push.send_json({"id": rand, "message": "Hello from %s" % rand})
# Update TTL
ttl = time.time() + 20
# Break TTL
except zmq.Again:
# No message
if time.time() > ttl:
print('TTL exceeded.. Bye bye')
sys.exit(0)
else:
# Sleep for 33ms
time.sleep(0.033)
# Error Handling
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Socked Interrupted
else:
raise
except AttributeError as e:
print("Malformed Message: AttributeError")
except KeyboardInterrupt as e:
print('\n', 'Catch KeyboardInterrupt...')
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment