public
Last active

simple example script for running and testing notebooks.

  • Download Gist
ipnbdoctest.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
#!/usr/bin/env python
"""
simple example script for running and testing notebooks.
 
Usage: `ipnbdoctest.py foo.ipynb [bar.ipynb [...]]`
 
Each cell is submitted to the kernel, and the outputs are compared with those stored in the notebook.
"""
 
import os,sys,time
import base64
import re
 
from collections import defaultdict
from Queue import Empty
 
try:
from IPython.kernel import KernelManager
except ImportError:
from IPython.zmq.blockingkernelmanager import BlockingKernelManager as KernelManager
 
from IPython.nbformat.current import reads, NotebookNode
 
 
def compare_png(a64, b64):
"""compare two b64 PNGs (incomplete)"""
try:
import Image
except ImportError:
pass
adata = base64.decodestring(a64)
bdata = base64.decodestring(b64)
return True
 
def sanitize(s):
"""sanitize a string for comparison.
fix universal newlines, strip trailing newlines, and normalize likely random values (memory addresses and UUIDs)
"""
if not isinstance(s, basestring):
return s
# normalize newline:
s = s.replace('\r\n', '\n')
# ignore trailing newlines (but not space)
s = s.rstrip('\n')
# normalize hex addresses:
s = re.sub(r'0x[a-f0-9]+', '0xFFFFFFFF', s)
# normalize UUIDs:
s = re.sub(r'[a-f0-9]{8}(\-[a-f0-9]{4}){3}\-[a-f0-9]{12}', 'U-U-I-D', s)
return s
 
 
def consolidate_outputs(outputs):
"""consolidate outputs into a summary dict (incomplete)"""
data = defaultdict(list)
data['stdout'] = ''
data['stderr'] = ''
for out in outputs:
if out.type == 'stream':
data[out.stream] += out.text
elif out.type == 'pyerr':
data['pyerr'] = dict(ename=out.ename, evalue=out.evalue)
else:
for key in ('png', 'svg', 'latex', 'html', 'javascript', 'text', 'jpeg',):
if key in out:
data[key].append(out[key])
return data
 
 
def compare_outputs(test, ref, skip_compare=('png', 'traceback', 'latex', 'prompt_number')):
for key in ref:
if key not in test:
print "missing key: %s != %s" % (test.keys(), ref.keys())
return False
elif key not in skip_compare and sanitize(test[key]) != sanitize(ref[key]):
print "mismatch %s:" % key
print test[key]
print ' != '
print ref[key]
return False
return True
 
 
def run_cell(shell, iopub, cell):
# print cell.input
shell.execute(cell.input)
# wait for finish, maximum 20s
shell.get_msg(timeout=20)
outs = []
while True:
try:
msg = iopub.get_msg(timeout=0.2)
except Empty:
break
msg_type = msg['msg_type']
if msg_type in ('status', 'pyin'):
continue
elif msg_type == 'clear_output':
outs = []
continue
content = msg['content']
# print msg_type, content
out = NotebookNode(output_type=msg_type)
if msg_type == 'stream':
out.stream = content['name']
out.text = content['data']
elif msg_type in ('display_data', 'pyout'):
out['metadata'] = content['metadata']
for mime, data in content['data'].iteritems():
attr = mime.split('/')[-1].lower()
# this gets most right, but fix svg+html, plain
attr = attr.replace('+xml', '').replace('plain', 'text')
setattr(out, attr, data)
if msg_type == 'pyout':
out.prompt_number = content['execution_count']
elif msg_type == 'pyerr':
out.ename = content['ename']
out.evalue = content['evalue']
out.traceback = content['traceback']
else:
print "unhandled iopub msg:", msg_type
outs.append(out)
return outs
 
def test_notebook(nb):
km = KernelManager()
km.start_kernel(extra_arguments=['--pylab=inline'], stderr=open(os.devnull, 'w'))
try:
kc = km.client()
kc.start_channels()
iopub = kc.iopub_channel
except AttributeError:
# IPython 0.13
kc = km
kc.start_channels()
iopub = kc.sub_channel
shell = kc.shell_channel
# run %pylab inline, because some notebooks assume this
# even though they shouldn't
shell.execute("pass")
shell.get_msg()
while True:
try:
iopub.get_msg(timeout=1)
except Empty:
break
successes = 0
failures = 0
errors = 0
for ws in nb.worksheets:
for cell in ws.cells:
if cell.cell_type != 'code':
continue
try:
outs = run_cell(shell, iopub, cell)
except Exception as e:
print "failed to run cell:", repr(e)
print cell.input
errors += 1
continue
failed = False
for out, ref in zip(outs, cell.outputs):
if not compare_outputs(out, ref):
failed = True
if failed:
failures += 1
else:
successes += 1
sys.stdout.write('.')
 
print
print "tested notebook %s" % nb.metadata.name
print " %3i cells successfully replicated" % successes
if failures:
print " %3i cells mismatched output" % failures
if errors:
print " %3i cells failed to complete" % errors
kc.stop_channels()
km.shutdown_kernel()
del km
 
if __name__ == '__main__':
for ipynb in sys.argv[1:]:
print "testing %s" % ipynb
with open(ipynb) as f:
nb = reads(f.read(), 'json')
test_notebook(nb)

Hi Min!

Your script has been very helpful as a starting point for a script evaluating student hand-ins. Unfortunately, when running large numbers of notebooks, Python eventually crashes with "Too many files open". Here is a simple reproducer

from IPython.kernel import KernelManager

n = 1

while True:
    km = KernelManager()
    km.start_kernel(extra_arguments=['--pylab=inline'], stderr=open(os.devnull, 'w'))
    kc = km.client()
    kc.start_channels()

    # do some work here

    kc.stop_channels()
    km.shutdown_kernel()
    del kc
    del km

    print '.',
    if n % 10 == 0:
        print 
    n += 1

which crashes after 24 rounds through the loop:

HEP-Admins-MacBook-Pro:results plesser$ ipython ../work/create_and_close.py 
. . . . . . . . . .
. . . . . . . . . .
. . . .Too many open files (signaler.cpp:330)
.Abort trap

It seems that the kc.stop_channels() and km.shutdown_kernel() and the two dels don't close all connections.

I am using IPython 1.1.0 with Python 2.7.5 (Anaconda) on OSX 10.6.8.

Update: As a work-around, I now create manager and client once in __main__ and then call km.reset_kernel(now=True) at the beginning of test_notebook(). This works fine for at least 175 notebooks.

Best,
Hans

I added comparison of PNG output cells in my fork using pypng: https://gist.github.com/shoyer/7497853

Feel free to merge it in if you like. Your little script has come in very handy for me!

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.