Watch IOPub traffic from IPython engines
import os
import sys
import json
import zmq
from IPython.zmq.session import Session
from IPython.parallel.util import disambiguate_url
from IPython.utils.path import get_ipython_dir
def main(connection_file):
"""watch iopub channel, and print messages"""
ctx = zmq.Context.instance()
with open(connection_file) as f:
cfg = json.loads(
location = cfg['location']
reg_url = cfg['url']
session = Session(key=cfg['exec_key'].encode('ascii'))
query = ctx.socket(zmq.XREQ)
query.connect(disambiguate_url(cfg['url'], location))
session.send(query, "connection_request")
idents,msg = session.recv(query, mode=0)
c = msg['content']
iopub_url = disambiguate_url(c['iopub'], location)
sub = ctx.socket(zmq.SUB)
# This will subscribe to all messages:
sub.setsockopt(zmq.SUBSCRIBE, b'')
# replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
# 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
# to everything from engine 1, but there is no way to subscribe to
# just stdout from everyone.
# multiple calls to subscribe will add subscriptions, e.g. to subscribe to
# engine 1's stderr and engine 2's stdout:
# sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
# sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
while True:
idents,msg = session.recv(sub, mode=0)
# ident always length 1 here
topic = idents[0]
if msg['msg_type'] == 'stream':
# stdout/stderr
# stream names are in msg['content']['name'], if you want to handle
# them differently
print "%s: %s" % (topic, msg['content']['data'])
elif msg['msg_type'] == 'pyerr':
# Python traceback
c = msg['content']
print topic + ':'
for line in c['traceback']:
# indent lines
print ' ' + line
if __name__ == '__main__':
if len(sys.argv) > 1:
cf = sys.argv[1]
# default path:
cf = os.path.join(get_ipython_dir(), 'profile_default', 'security', 'ipcontroller-client.json')
