Skip to content

Instantly share code, notes, and snippets.

@zacharyvoase
Created February 8, 2012 15:28
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save zacharyvoase/1770447 to your computer and use it in GitHub Desktop.
Save zacharyvoase/1770447 to your computer and use it in GitHub Desktop.
zmqc: A small but powerful command-line interface to ZMQ.
#!/usr/bin/env python
# zmqc: a small but powerful command-line interface to ZMQ.
## Usage:
# zmqc [-0] (-r | -w) (-b | -c) SOCK_TYPE [-o SOCK_OPT=VALUE...] address [address ...]
## Examples:
# zmqc -rc SUB 'tcp://127.0.0.1:5000'
#
# Subscribe to 'tcp://127.0.0.1:5000', reading messages from it and printing
# them to the console. This will subscribe to all messages by default.
#
# ls | zmqc -wb PUSH 'tcp://*:4000'
#
# Send the name of every file in the current directory as a message from a
# PUSH socket bound to port 4000 on all interfaces. Don't forget to quote the
# address to avoid glob expansion.
#
# zmqc -rc PULL 'tcp://127.0.0.1:5202' | tee $TTY | zmqc -wc PUSH 'tcp://127.0.0.1:5404'
#
# Read messages coming from a PUSH socket bound to port 5202 (note that we're
# connecting with a PULL socket), echo them to the active console, and
# forward them to a PULL socket bound to port 5404 (so we're connecting with
# a PUSH).
#
# zmqc -n 10 -0rb PULL 'tcp://*:4123' | xargs -0 grep 'pattern'
#
# Bind to a PULL socket on port 4123, receive 10 messages from the socket
# (with each message representing a filename), and grep the files for
# `'pattern'`. The `-0` option means messages will be NULL-delimited rather
# than separated by newlines, so that filenames with spaces in them are not
# considered two separate arguments by xargs.
## License:
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
# For more information, please refer to <http://unlicense.org/>
import argparse
import array
import errno
import itertools
import re
import sys
import zmq
__version__ = '0.0.1'
class ParserError(Exception):
"""An exception which occurred when parsing command-line arguments."""
pass
parser = argparse.ArgumentParser(
prog='zmqc', version=__version__,
usage=
"%(prog)s [-h] [-v] [-0] (-r | -w) (-b | -c)\n "
"SOCK_TYPE [-o SOCK_OPT=VALUE...]\n "
"address [address ...]",
description="zmqc is a small but powerful command-line interface to ZMQ. "
"It allows you to create a socket of a given type, bind or connect it to "
"multiple addresses, set options on it, and receive or send messages over "
"it using standard I/O, in the shell or in scripts.",
epilog="This is free and unencumbered software released into the public "
"domain. For more information, please refer to <http://unlicense.org>.",
)
parser.add_argument('-0',
dest='delimiter', action='store_const',
const='\x00', default='\n',
help="Separate messages on input/output should be "
"delimited by NULL characters (instead of newlines). Use "
"this if your messages may contain newlines, and you want "
"to avoid ambiguous message borders.")
parser.add_argument('-n', metavar='NUM',
dest='number', type=int, default=None,
help="Receive/send only NUM messages. By default, zmqc "
"lives forever in 'write' mode, or until the end of input "
"in 'read' mode.")
mode_group = parser.add_argument_group(title='Mode')
mode = mode_group.add_mutually_exclusive_group(required=True)
mode.add_argument('-r', '--read',
dest='mode', action='store_const', const='r',
help="Read messages from the socket onto stdout.")
mode.add_argument('-w', '--write',
dest='mode', action='store_const', const='w',
help="Write messages from stdin to the socket.")
behavior_group = parser.add_argument_group(title='Behavior')
behavior = behavior_group.add_mutually_exclusive_group(required=True)
behavior.add_argument('-b', '--bind',
dest='behavior', action='store_const', const='bind',
help="Bind to the specified address(es).")
behavior.add_argument('-c', '--connect',
dest='behavior', action='store_const', const='connect',
help="Connect to the specified address(es).")
sock_params = parser.add_argument_group(title='Socket parameters')
sock_type = sock_params.add_argument('sock_type', metavar='SOCK_TYPE',
choices=('PUSH', 'PULL', 'PUB', 'SUB', 'PAIR'), type=str.upper,
help="Which type of socket to create. Must be one of 'PUSH', 'PULL', "
"'PUB', 'SUB' or 'PAIR'. See `man zmq_socket` for an explanation of the "
"different types. 'REQ', 'REP', 'DEALER' and 'ROUTER' sockets are "
"currently unsupported. --read mode is unsupported for PUB sockets, and "
"--write mode is unsupported for SUB sockets.")
sock_opts = sock_params.add_argument('-o', '--option',
metavar='SOCK_OPT=VALUE', dest='sock_opts', action='append', default=[],
help="Socket option names and values to set on the created socket. "
"Consult `man zmq_setsockopt` for a comprehensive list of options. Note "
"that you can safely omit the 'ZMQ_' prefix from the option name. If the "
"created socket is of type 'SUB', and no 'SUBSCRIBE' options are given, "
"the socket will automatically be subscribed to everything.")
addresses = sock_params.add_argument('addresses', nargs='+', metavar='address',
help="One or more addresses to bind/connect to. Must be in full ZMQ "
"format (e.g. 'tcp://<host>:<port>')")
def read_until_delimiter(stream, delimiter):
"""
Read from a stream until a given delimiter or EOF, or raise EOFError.
>>> io = StringIO("abcXdefgXfoo")
>>> read_until_delimiter(io, "X")
"abc"
>>> read_until_delimiter(io, "X")
"defg"
>>> read_until_delimiter(io, "X")
"foo"
>>> read_until_delimiter(io, "X")
Traceback (most recent call last):
...
EOFError
"""
output = array.array('c')
c = stream.read(1)
while c and c != delimiter:
output.append(c)
c = stream.read(1)
if not (c or output):
raise EOFError
return output.tostring()
def get_sockopts(sock_opts):
"""
Turn a list of 'OPT=VALUE' into a list of (opt_code, value).
Work on byte string options:
>>> get_sockopts(['SUBSCRIBE=', 'SUBSCRIBE=abc'])
[(6, ''), (6, 'abc')]
Automatically convert integer options to integers:
>>> zmqc.get_sockopts(['LINGER=0', 'LINGER=-1', 'LINGER=50'])
[(17, 0), (17, -1), (17, 50)]
Spew on invalid input:
>>> zmqc.get_sockopts(['LINGER=foo'])
Traceback (most recent call last):
...
zmqc.ParserError: Invalid value for option LINGER: 'foo'
>>> zmqc.get_sockopts(['NONEXISTENTOPTION=blah'])
Traceback (most recent call last):
...
zmqc.ParserError: Unrecognised socket option: 'NONEXISTENTOPTION'
"""
option_coerce = {
int: set(zmq.core.constants.int_sockopts).union(
zmq.core.constants.int64_sockopts),
str: set(zmq.core.constants.bytes_sockopts)
}
options = []
for option in sock_opts:
match = re.match(r'^([A-Z_]+)\=(.*)$', option)
if not match:
raise ParserError("Invalid option spec: %r" % match)
opt_name = match.group(1)
if opt_name.startswith('ZMQ_'):
opt_name = opt_name[4:]
try:
opt_code = getattr(zmq.core.constants, opt_name.upper())
except AttributeError:
raise ParserError("Unrecognised socket option: %r" % (
match.group(1),))
opt_value = match.group(2)
for converter, opt_codes in option_coerce.iteritems():
if opt_code in opt_codes:
try:
opt_value = converter(opt_value)
except (TypeError, ValueError):
raise ParserError("Invalid value for option %s: %r" % (
opt_name, opt_value))
break
options.append((opt_code, opt_value))
return options
def main():
args = parser.parse_args()
context = zmq.Context.instance()
sock = context.socket(getattr(zmq, args.sock_type))
# Bind or connect to the provided addresses.
for address in args.addresses:
getattr(sock, args.behavior)(address)
# Set any specified socket options.
try:
sock_opts = get_sockopts(args.sock_opts)
except ParserError, exc:
parser.error(str(exc))
else:
for opt_code, opt_value in sock_opts:
sock.setsockopt(opt_code, opt_value)
# If we have a 'SUB' socket that's not explicitly subscribed to
# anything, subscribe it to everything.
if (sock.socket_type == zmq.SUB and
not any(opt_code == zmq.SUBSCRIBE
for (opt_code, _) in sock_opts)):
sock.setsockopt(zmq.SUBSCRIBE, '')
# Live forever if no `-n` argument was given, otherwise die after a fixed
# number of messages.
if args.number is None:
iterator = itertools.repeat(None)
else:
iterator = itertools.repeat(None, args.number)
try:
if args.mode == 'r':
read_loop(iterator, sock, args.delimiter, sys.stdout)
elif args.mode == 'w':
write_loop(iterator, sock, args.delimiter, sys.stdin)
finally:
sock.close()
def read_loop(iterator, sock, delimiter, output):
"""Continously get messages from the socket and print them on output."""
for _ in iterator:
try:
message = sock.recv()
output.write(message + delimiter)
output.flush()
except KeyboardInterrupt:
return
except IOError, exc:
if exc.errno == errno.EPIPE:
return
raise
def write_loop(iterator, sock, delimiter, input):
"""Continously get messages from input and send them through the socket."""
for _ in iterator:
try:
message = read_until_delimiter(input, delimiter)
sock.send(message)
except (KeyboardInterrupt, EOFError):
return
if __name__ == '__main__':
main()
@glumb
Copy link

glumb commented Aug 22, 2017

Hey, this looks like a great tool, but unfortunetely it does not work anymore

Traceback (most recent call last):
  File "./zeromy.py", line 299, in <module>
    main()
  File "./zeromy.py", line 241, in main
    sock_opts = get_sockopts(args.sock_opts)
  File "./zeromy.py", line 196, in get_sockopts
    int: set(zmq.core.constants.int_sockopts).union(
AttributeError: 'module' object has no attribute 'core'

I assume the pyzmq API changed.

@glumb
Copy link

glumb commented Aug 22, 2017

Found a solution: remove the core from core.constants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment