Skip to content

Instantly share code, notes, and snippets.

@vitiral
Last active September 30, 2022 10:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save vitiral/b159a579eef23564c06e to your computer and use it in GitHub Desktop.
Save vitiral/b159a579eef23564c06e to your computer and use it in GitHub Desktop.
ZMQ python communications

ZMQ python

Demonstrates some simple zmq communication

Similar to this example but usable more generally.

Quoting http://zeromq.org/area:faq:

Can I subscribe to messages using regex or wildcards?

  • No. Prefix matching only.

This implements simple prefix matching and allows you to choose what data packing algorithm you want (I recommend msgpack, json or pickle)

import re
import json
try:
import msgpack
MPACK = True
except ImportError:
MPACK = False
UNPACK = re.compile(b'(.*)\x00(.*)').match
class Packer:
def __init__(self, packer=None):
if packer is None:
self.packer = msgpack if MPACK else json
else:
self.packer = packer
def loads(self, data):
name, data = UNPACK(data).groups()
return name.decode(), self.packer.loads(data)
def dumps(self, name, data):
return b'\x00'.join((name.encode(), self.packer.dumps(data)))
class zsocket(Packer):
'''Layers on top of an existing zmq socket to give better
data packing
Names must NOT use the `\x00` character'''
def __init__(self, socket, packer=None):
self.socket = socket
super().__init__(packer)
def send(self, name, data):
self.socket.send(self.dumps(str(name), data))
def recv(self):
return self.loads(self.socket.recv())
import zmq
import random
import sys
import time
import zmsg
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
zsocket = zmsg.zsocket(socket)
while True:
topic = random.randrange(9999, 10005)
messagedata = random.randrange(1, 215) - 80
print("%d %d" % (topic, messagedata))
zsocket.send(topic, messagedata)
time.sleep(.2)
import sys
import zmq
import zmsg
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
if len(sys.argv) > 2:
port1 = sys.argv[2]
int(port1)
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
zsocket = zmsg.zsocket(socket)
print("Collecting updates from weather server...")
socket.connect("tcp://localhost:%s" % port)
if len(sys.argv) > 2:
socket.connect("tcp://localhost:%s" % port1)
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter.encode())
# Process 5 updates
total_value = 0
for update_nbr in range(5):
topic, messagedata = zsocket.recv()
total_value += messagedata
print(topic, messagedata)
print("Average messagedata value for topic '%s' was %dF" %
(topicfilter, total_value / update_nbr))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment