Skip to content

Instantly share code, notes, and snippets.

@Lothiraldan
Created February 1, 2012 21:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lothiraldan/1719522 to your computer and use it in GitHub Desktop.
Save Lothiraldan/1719522 to your computer and use it in GitHub Desktop.
ZMQStreams util function
# Come from excellent http://stefan.sofa-rockers.org/2012/02/01/designing-and-testing-pyzmq-applications-part-1/
def stream(context, loop, sock_type, addr, bind, callback=None, subscribe=b'',
random=False):
"""
Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
:param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
:param addr: Address to bind or connect to formatted as *host:port*,
*(host, port)* or *host* (bind to random port).
If *bind* is ``True``, *host* may be:
- the wild-card ``*``, meaning all available interfaces,
- the primary IPv4 address assigned to the interface, in its
numeric representation or
- the interface name as defined by the operating system.
If *bind* is ``False``, *host* may be:
- the DNS name of the peer or
- the IPv4 address of the peer, in its numeric representation.
If *addr* is just a host name without a port and *bind* is
``True``, the socket will be bound to a random port.
:param bind: Binds to *addr* if ``True`` or tries to connect to it
otherwise.
:param callback: A callback for
:meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
:param subscribe: Subscription pattern for *SUB* sockets, optional,
defaults to ``b''``.
:returns: A tuple containg the stream and the port number.
UPDATE:
addr is now a valid ØMQ address (like tcp://127.0.0.1 or ipc:///tmp/test) but you must use random=True if you want to bind to a random port. Return the port only if not binding a random port.
"""
sock = context.socket(sock_type)
# Bind/connect the socket
if bind:
if not random:
sock.bind(addr)
else:
port = sock.bind_to_random_port(addr)
else:
sock.connect(addr)
# Add a default subscription for SUB sockets
if sock_type == zmq.SUB:
sock.setsockopt(zmq.SUBSCRIBE, subscribe)
# Create the stream and add the callback
stream = zmqstream.ZMQStream(sock, loop)
if callback:
stream.on_recv(callback)
if random:
return stream, int(port)
else:
return stream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment