Skip to content

Instantly share code, notes, and snippets.

@pohmelie
Created April 5, 2017 12:15
Show Gist options
  • Save pohmelie/a7f53e1e64fe4906f0b9ed6d7e8a4469 to your computer and use it in GitHub Desktop.
Save pohmelie/a7f53e1e64fe4906f0b9ed6d7e8a4469 to your computer and use it in GitHub Desktop.
aiozmq XPUB/XSUB example
xpub connection made
xsub connection made
xsub binding tcp://127.0.0.1:45098
pub connection made
xpub binding tcp://127.0.0.1:34462
sub connection made
xpub message received [b'\x01test']
xpub writing [b'\x01test'] to xsub
xsub message received [b'test', b'0']
xsub writing [b'test', b'0'] to xpub
sub message received [b'test', b'0']
xsub message received [b'test', b'1']
xsub writing [b'test', b'1'] to xpub
sub message received [b'test', b'1']
xsub message received [b'test', b'2']
xsub writing [b'test', b'2'] to xpub
sub message received [b'test', b'2']
xpub message received [b'\x00test']
xpub writing [b'\x00test'] to xsub
import asyncio
import aiozmq
import zmq
class Protocol(aiozmq.ZmqProtocol):
def __init__(self, name):
self.name = name
self.sink = None
def connection_made(self, transport):
print(self.name, "connection made")
self.transport = transport
def connection_lost(self, exc):
print(self.name, "connection lost")
self.transport = None
def msg_received(self, data):
print(self.name, "message received", data)
if self.sink:
print(self.name, "writing", data, "to", self.sink.name)
self.sink.transport.write(data)
@property
def binding(self):
binding = next(iter(self.transport.bindings()))
print(self.name, "binding", binding)
return binding
async def test_aiozmq():
xpub, xpub_p = await aiozmq.create_zmq_connection(
lambda: Protocol("xpub"),
zmq.XPUB,
bind="tcp://127.0.0.1:*",
)
xsub, xsub_p = await aiozmq.create_zmq_connection(
lambda: Protocol("xsub"),
zmq.XSUB,
bind="tcp://127.0.0.1:*",
)
xpub_p.sink, xsub_p.sink = xsub_p, xpub_p
pub, pub_p = await aiozmq.create_zmq_connection(
lambda: Protocol("pub"),
zmq.PUB,
connect=xsub_p.binding,
)
sub, sub_p = await aiozmq.create_zmq_connection(
lambda: Protocol("sub"),
zmq.SUB,
connect=xpub_p.binding,
)
sub.subscribe(b"test")
await asyncio.sleep(1)
for i in range(3):
pub.write([b"test", str(i).encode()])
pub.write([b"foo", b"123"])
await asyncio.sleep(1)
sub.unsubscribe(b"test")
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(test_aiozmq())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment