Skip to content

Instantly share code, notes, and snippets.

@minrk
Created March 2, 2013 02:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/5069344 to your computer and use it in GitHub Desktop.
Save minrk/5069344 to your computer and use it in GitHub Desktop.
"""FIFO Queue with zmq IPC
for http://stackoverflow.com/questions/15166039
Typical output:
[S] 0.0 sending msg 0
[S] 1.0 sending msg 1
[S] 2.0 sending msg 2
[S] 3.0 sending msg 3
[S] 4.0 sending msg 4
[S] 5.0 sending msg 5
[R] 5.2 consuming msg 0
[R] 5.2 consuming msg 1
[R] 5.2 consuming msg 2
[R] 5.2 consuming msg 3
[R] 5.2 consuming msg 4
[R] 5.2 consuming msg 5
[S] 6.0 sending msg 6
[R] 6.0 consuming msg 6
[S] 7.0 sending msg 7
[R] 7.0 consuming msg 7
[S] 8.0 sending msg 8
[R] 8.0 consuming msg 8
[S] 9.0 sending msg 9
[R] 9.0 consuming msg 9
[S] sending DONE
[R] received DONE
[S] done
[R] done
"""
import time
import zmq
from threading import Thread
# the file used for IPC communication
PIPE = '/tmp/fifo-pipe'
# command flags for our tiny message protocol
DONE = b'\x00'
MSG = b'\x01'
def receiver():
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.bind("ipc://%s" % PIPE)
while True:
parts = s.recv_multipart()
cmd = parts[0]
if cmd == DONE:
print "[R] received DONE"
break
msg = parts[1]
# handle the message
print "[R] %.1f consuming %s" % (time.time() - t0, msg)
s.close()
ctx.term()
print "[R] done"
def sender():
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect("ipc://%s" % PIPE)
for i in range(10):
msg = b'msg %i' % i
print "[S] %.1f sending %s" % (time.time() - t0, msg)
s.send_multipart([MSG, msg])
time.sleep(1)
print "[S] sending DONE"
s.send(DONE)
s.close()
ctx.term()
print "[S] done"
# global t0, just for keeping times relative to start, rather than 1970
t0 = time.time()
# start the sender
s = Thread(target=sender)
s.start()
# start the receiver after a delay
time.sleep(5)
r = Thread(target=receiver)
r.start()
# wait for them both to finish
s.join()
r.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment