Created
March 2, 2013 02:09
-
-
Save minrk/5069344 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""FIFO Queue with zmq IPC | |
for http://stackoverflow.com/questions/15166039 | |
Typical output: | |
[S] 0.0 sending msg 0 | |
[S] 1.0 sending msg 1 | |
[S] 2.0 sending msg 2 | |
[S] 3.0 sending msg 3 | |
[S] 4.0 sending msg 4 | |
[S] 5.0 sending msg 5 | |
[R] 5.2 consuming msg 0 | |
[R] 5.2 consuming msg 1 | |
[R] 5.2 consuming msg 2 | |
[R] 5.2 consuming msg 3 | |
[R] 5.2 consuming msg 4 | |
[R] 5.2 consuming msg 5 | |
[S] 6.0 sending msg 6 | |
[R] 6.0 consuming msg 6 | |
[S] 7.0 sending msg 7 | |
[R] 7.0 consuming msg 7 | |
[S] 8.0 sending msg 8 | |
[R] 8.0 consuming msg 8 | |
[S] 9.0 sending msg 9 | |
[R] 9.0 consuming msg 9 | |
[S] sending DONE | |
[R] received DONE | |
[S] done | |
[R] done | |
""" | |
import time | |
import zmq | |
from threading import Thread | |
# the file used for IPC communication | |
PIPE = '/tmp/fifo-pipe' | |
# command flags for our tiny message protocol | |
DONE = b'\x00' | |
MSG = b'\x01' | |
def receiver(): | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PULL) | |
s.bind("ipc://%s" % PIPE) | |
while True: | |
parts = s.recv_multipart() | |
cmd = parts[0] | |
if cmd == DONE: | |
print "[R] received DONE" | |
break | |
msg = parts[1] | |
# handle the message | |
print "[R] %.1f consuming %s" % (time.time() - t0, msg) | |
s.close() | |
ctx.term() | |
print "[R] done" | |
def sender(): | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PUSH) | |
s.connect("ipc://%s" % PIPE) | |
for i in range(10): | |
msg = b'msg %i' % i | |
print "[S] %.1f sending %s" % (time.time() - t0, msg) | |
s.send_multipart([MSG, msg]) | |
time.sleep(1) | |
print "[S] sending DONE" | |
s.send(DONE) | |
s.close() | |
ctx.term() | |
print "[S] done" | |
# global t0, just for keeping times relative to start, rather than 1970 | |
t0 = time.time() | |
# start the sender | |
s = Thread(target=sender) | |
s.start() | |
# start the receiver after a delay | |
time.sleep(5) | |
r = Thread(target=receiver) | |
r.start() | |
# wait for them both to finish | |
s.join() | |
r.join() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment