Created
February 1, 2012 21:16
-
-
Save Lothiraldan/1719522 to your computer and use it in GitHub Desktop.
ZMQStreams util function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Come from excellent http://stefan.sofa-rockers.org/2012/02/01/designing-and-testing-pyzmq-applications-part-1/ | |
def stream(context, loop, sock_type, addr, bind, callback=None, subscribe=b'', | |
random=False): | |
""" | |
Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. | |
:param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) | |
:param addr: Address to bind or connect to formatted as *host:port*, | |
*(host, port)* or *host* (bind to random port). | |
If *bind* is ``True``, *host* may be: | |
- the wild-card ``*``, meaning all available interfaces, | |
- the primary IPv4 address assigned to the interface, in its | |
numeric representation or | |
- the interface name as defined by the operating system. | |
If *bind* is ``False``, *host* may be: | |
- the DNS name of the peer or | |
- the IPv4 address of the peer, in its numeric representation. | |
If *addr* is just a host name without a port and *bind* is | |
``True``, the socket will be bound to a random port. | |
:param bind: Binds to *addr* if ``True`` or tries to connect to it | |
otherwise. | |
:param callback: A callback for | |
:meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional | |
:param subscribe: Subscription pattern for *SUB* sockets, optional, | |
defaults to ``b''``. | |
:returns: A tuple containg the stream and the port number. | |
UPDATE: | |
addr is now a valid ØMQ address (like tcp://127.0.0.1 or ipc:///tmp/test) but you must use random=True if you want to bind to a random port. Return the port only if not binding a random port. | |
""" | |
sock = context.socket(sock_type) | |
# Bind/connect the socket | |
if bind: | |
if not random: | |
sock.bind(addr) | |
else: | |
port = sock.bind_to_random_port(addr) | |
else: | |
sock.connect(addr) | |
# Add a default subscription for SUB sockets | |
if sock_type == zmq.SUB: | |
sock.setsockopt(zmq.SUBSCRIBE, subscribe) | |
# Create the stream and add the callback | |
stream = zmqstream.ZMQStream(sock, loop) | |
if callback: | |
stream.on_recv(callback) | |
if random: | |
return stream, int(port) | |
else: | |
return stream |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment