Last active
August 29, 2015 14:09
-
-
Save lebedov/2e27e71a1759581cecdf to your computer and use it in GitHub Desktop.
ZeroMQ asynchronous socket pair synchronizer.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Synchronize PUB/SUB sockets. | |
""" | |
import multiprocessing as mp | |
import time | |
import zmq | |
from zerosync import sync_pub, sync_sub | |
def pub(): | |
print 'pub starting' | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.PUB) | |
sock.bind('ipc://test') | |
sync_pub(sock, ['sub0', 'sub1']) | |
#sync_pub(sock, 2) | |
print 'pub synchronized' | |
# Send data: | |
for i in xrange(10): | |
sock.send(str(i)) | |
sock.send('quit') | |
def sub(i): | |
id = 'sub%s' % i | |
print id+' starting' | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.SUB) | |
sock.setsockopt(zmq.SUBSCRIBE, '') | |
sock.connect('ipc://test') | |
sync_sub(sock, id) | |
print id+' synchronized' | |
# Receive data: | |
while True: | |
data = sock.recv() | |
print id+' '+str(data) | |
if data == 'quit': | |
break | |
if __name__ == '__main__': | |
p = mp.Process(target=pub) | |
p.start() | |
s0 = mp.Process(target=sub, args=(0,)) | |
s0.start() | |
s1 = mp.Process(target=sub, args=(1,)) | |
s1.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Synchronize ROUTER/DEALER sockets. | |
""" | |
import multiprocessing as mp | |
import time | |
import zmq | |
from zerosync import sync_router, sync_dealer | |
def router(): | |
print 'router starting' | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.ROUTER) | |
sock.bind('ipc://test') | |
sync_router(sock, ['dealer0', 'dealer1']) | |
print 'router synchronized' | |
# Send data: | |
for i in xrange(5): | |
sock.send_multipart(['dealer0', str(i)]) | |
for i in xrange(5, 10): | |
sock.send_multipart(['dealer1', str(i)]) | |
sock.send_multipart(['dealer0', 'quit']) | |
sock.send_multipart(['dealer1', 'quit']) | |
def dealer(i): | |
id = 'dealer%s' % i | |
print id+' starting' | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.DEALER) | |
sock.setsockopt(zmq.IDENTITY, id) | |
sock.connect('ipc://test') | |
sync_dealer(sock, sock.getsockopt(zmq.IDENTITY)) | |
print id+' synchronized' | |
# Receive data: | |
while True: | |
data = sock.recv() | |
print id+' '+str(data) | |
if data == 'quit': | |
break | |
if __name__ == '__main__': | |
r = mp.Process(target=router) | |
r.start() | |
d0 = mp.Process(target=dealer, args=(0,)) | |
d0.start() | |
d1 = mp.Process(target=dealer, args=(1,)) | |
d1.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
ZeroMQ asynchronous socket pair synchronizer. | |
""" | |
import numbers | |
import zmq | |
def sync_pub(sock, ids, sync_addr='ipc://sync', timeout=10): | |
""" | |
Synchronize a single PUB socket with multiple SUB sockets. | |
Must be paired with a call to `sync_sub`. | |
Parameters | |
---------- | |
sock : zmq.Socket | |
PUB socket to synchronize. | |
ids : sequence of str or int | |
If a sequence, specifies the IDs associated with SUB sockets. If an | |
integer, specifies the number of SUB sockets to synchronize. | |
sync_addr : str | |
Port address to use for synchronization socket pair. This | |
must be the same as that passed to `sync_sub`. | |
timeout : int | |
Polling timeout. | |
Notes | |
----- | |
Some synchronization messages may be delivered to the SUB sockets | |
after this routine exits. | |
""" | |
assert sock.getsockopt(zmq.TYPE) == zmq.PUB | |
sock_sync = sock.context.socket(zmq.ROUTER) | |
sock_sync.bind(sync_addr) | |
if isinstance(ids, numbers.Integral): | |
# ids indicates the number of subscribers: | |
id_set = set() | |
while True: | |
sock.send('') | |
if sock_sync.poll(timeout): | |
id, _ = sock_sync.recv_multipart() | |
id_set.add(id) | |
if len(id_set) == ids: | |
break | |
else: | |
# ids contains the subscriber sync port IDs: | |
id_set = set(ids) | |
while True: | |
sock.send('') | |
if sock_sync.poll(timeout): | |
id, _ = sock_sync.recv_multipart() | |
try: | |
id_set.remove(id) | |
except: | |
pass | |
if not id_set: | |
break | |
def sync_sub(sock, id, sync_addr='ipc://sync', timeout=10): | |
""" | |
Synchronize a SUB socket with a single PUB socket. | |
Must be paired with a call to `sync_pub`. | |
Parameters | |
---------- | |
sock : zmq.Socket | |
SUB socket to synchronize. | |
id : str | |
IDs associated with SUB socket. | |
sync_addr : str | |
Port address to use for synchronization socket pair. This | |
must be the same as that passed to `sync_pub`. | |
timeout : int | |
Polling timeout. | |
Notes | |
----- | |
Some synchronization messages may be delivered to the specified SUB socket | |
after this routine exits. | |
""" | |
assert sock.getsockopt(zmq.TYPE) == zmq.SUB | |
sock_sync = sock.context.socket(zmq.DEALER) | |
sock_sync.setsockopt(zmq.IDENTITY, id) | |
sock_sync.connect(sync_addr) | |
while True: | |
if sock.poll(timeout): | |
sock.recv() | |
sock_sync.send('') | |
break | |
def sync_router(sock, ids, sync_addr='ipc://sync', timeout=10): | |
""" | |
Synchronize a single ROUTER socket with multiple DEALER sockets. | |
Must be paired with a call to `sync_dealer`. | |
Parameters | |
---------- | |
sock : zmq.Socket | |
ROUTER socket to synchronize. | |
ids : sequence of str | |
IDs associated with DEALER synchronization sockets | |
sync_addr : str | |
Port address to use for synchronization socket pair. This | |
must be the same as that passed to `sync_dealer`. | |
timeout : int | |
Polling timeout. | |
""" | |
assert sock.getsockopt(zmq.TYPE) == zmq.ROUTER | |
sock_sync = sock.context.socket(zmq.ROUTER) | |
sock_sync.bind(sync_addr) | |
id_set = set(ids) | |
while True: | |
for id in id_set: | |
sock.send_multipart([id, '']) | |
if sock_sync.poll(timeout): | |
id, _ = sock_sync.recv_multipart() | |
# Make the last sync message different so that the dealer | |
# will know when to stop discarding messages: | |
if id in id_set: | |
sock.send_multipart([id, 'done']) | |
id_set.remove(id) | |
if not id_set: | |
break | |
def sync_dealer(sock, id=None, sync_addr='ipc://sync', timeout=10): | |
""" | |
Synchronize a DEALER socket with a single ROUTER socket. | |
Must be paired with a call to `sync_router`. | |
Parameters | |
---------- | |
sock : zmq.Socket | |
DEALER socket to synchronize. | |
id : str | |
ID associated with ROUTER synchronization socket. | |
If not specified, the ID is assumed to be the same as that | |
of the specified DEALER socket. | |
sync_addr : str | |
Port address to use for synchronization socket pair. This | |
must be the same as that passed to `sync_router`. | |
timeout : int | |
Polling timeout. | |
""" | |
assert sock.getsockopt(zmq.TYPE) == zmq.DEALER | |
sock_sync = sock.context.socket(zmq.DEALER) | |
if id is None: | |
sock_sync.setsockopt(zmq.IDENTITY, sock.getsockopt(zmq.IDENTITY)) | |
else: | |
sock_sync.setsockopt(zmq.IDENTITY, id) | |
sock_sync.connect(sync_addr) | |
while True: | |
if sock.poll(timeout): | |
msg = sock.recv() | |
sock_sync.send('') | |
# Discard messages until the last synchronization message is | |
# received: | |
while sock.poll(timeout): | |
if sock.recv() == 'done': | |
break | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment