Skip to content

Instantly share code, notes, and snippets.

@abhijitmamarde
Created January 31, 2019 15:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abhijitmamarde/af5859e4b81c0b178fa14815b1cd131f to your computer and use it in GitHub Desktop.
Save abhijitmamarde/af5859e4b81c0b178fa14815b1cd131f to your computer and use it in GitHub Desktop.
pub-sub using zmq and multiprocess module
import zmq
import baker
from multiprocessing import Process
@baker.command
def start(name, topicfilter="10001", port="5566"):
print(f"listening pub-server at port:{int(port)}; topic:{topicfilter}")
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
socket.connect("tcp://localhost:%s" % port)
# Process 5 updates
total_value = 0
update_nbr = 0
while True:
string = socket.recv_string()
print("recevied:", string)
topic, messagedata = string.split()
total_value += int(messagedata)
update_nbr += 1
print(topic, messagedata)
print(f"{name} avg value for topic {topicfilter} was {total_value / update_nbr}")
@baker.command
def run_clients(client_names, topicfilter="10001", port="5566"):
client_names = client_names.split(",")
for i, name in enumerate(client_names):
Process(target=start, args=(name, str(int(topicfilter)+i), port)).start()
if __name__ == "__main__":
baker.run()
import zmq
import random
import time
import baker
@baker.command
def start(port="5566"):
print(f"running pub-server on: {int(port)}")
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
topic = random.randrange(9999, 10005)
messagedata = random.randrange(1, 215) - 80
print("%d %d" % (topic, messagedata))
socket.send_string("%d %d" % (topic, messagedata))
time.sleep(1)
if __name__ == "__main__":
baker.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment