public
Last active — forked from minrk/ipnbdoctest.py

  • Download Gist
ipnbdoctest.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
#!/usr/bin/env python
"""
simple example script for running and testing notebook resulting in a new workbook.
 
Usage: `ipnbdoctest.py foo.ipynb foo_new.ipynb`
 
Each cell is submitted to the kernel, and the outputs are compared with those stored in the notebook.
"""
 
import io
import os,sys,time
import base64
import re
 
from collections import defaultdict
from Queue import Empty
 
try:
from IPython.kernel import KernelManager
except ImportError:
from IPython.zmq.blockingkernelmanager import BlockingKernelManager as KernelManager
 
from IPython.nbformat.current import reads, NotebookNode, write
 
 
def compare_png(a64, b64):
"""compare two b64 PNGs (incomplete)"""
try:
import Image
except ImportError:
pass
adata = base64.decodestring(a64)
bdata = base64.decodestring(b64)
return True
 
def sanitize(s):
"""sanitize a string for comparison.
fix universal newlines, strip trailing newlines, and normalize likely random values (memory addresses and UUIDs)
"""
# normalize newline:
s = s.replace('\r\n', '\n')
# ignore trailing newlines (but not space)
s = s.rstrip('\n')
# normalize hex addresses:
s = re.sub(r'0x[a-f0-9]+', '0xFFFFFFFF', s)
# normalize UUIDs:
s = re.sub(r'[a-f0-9]{8}(\-[a-f0-9]{4}){3}\-[a-f0-9]{12}', 'U-U-I-D', s)
return s
 
 
def consolidate_outputs(outputs):
"""consolidate outputs into a summary dict (incomplete)"""
data = defaultdict(list)
data['stdout'] = ''
data['stderr'] = ''
for out in outputs:
if out.type == 'stream':
data[out.stream] += out.text
elif out.type == 'pyerr':
data['pyerr'] = dict(ename=out.ename, evalue=out.evalue)
else:
for key in ('png', 'svg', 'latex', 'html', 'javascript', 'text', 'jpeg',):
if key in out:
data[key].append(out[key])
return data
 
 
def compare_outputs(test, ref, skip_compare=('png', 'traceback', 'latex', 'prompt_number')):
for key in ref:
if key not in test:
print "missing key: %s != %s" % (test.keys(), ref.keys())
return False
elif key not in skip_compare and sanitize(test[key]) != sanitize(ref[key]):
print "mismatch %s:" % key
print test[key]
print ' != '
print ref[key]
return False
return True
 
 
def run_cell(shell, iopub, cell):
# print cell.input
shell.execute(cell.input)
# wait for finish, maximum 20s
shell.get_msg(timeout=20)
outs = []
while True:
try:
msg = iopub.get_msg(timeout=0.2)
except Empty:
break
msg_type = msg['msg_type']
if msg_type in ('status', 'pyin'):
continue
elif msg_type == 'clear_output':
outs = []
continue
content = msg['content']
# print msg_type, content
out = NotebookNode(output_type=msg_type)
if msg_type == 'stream':
out.stream = content['name']
out.text = content['data']
elif msg_type in ('display_data', 'pyout'):
for mime, data in content['data'].iteritems():
attr = mime.split('/')[-1].lower()
# this gets most right, but fix svg+html, plain
attr = attr.replace('+xml', '').replace('plain', 'text')
setattr(out, attr, data)
if msg_type == 'pyout':
#out.prompt_number = content['execution_count']
#TODO: need to find better workaround
pass
elif msg_type == 'pyerr':
out.ename = content['ename']
out.evalue = content['evalue']
out.traceback = content['traceback']
else:
print "unhandled iopub msg:", msg_type
outs.append(out)
return outs
 
def test_notebook(nb):
km = KernelManager()
km.start_kernel(extra_arguments=['--pylab=inline'], stderr=open(os.devnull, 'w'))
try:
kc = km.client()
kc.start_channels()
iopub = kc.iopub_channel
except AttributeError:
# IPython 0.13
kc = km
kc.start_channels()
iopub = kc.sub_channel
shell = kc.shell_channel
# run %pylab inline, because some notebooks assume this
# even though they shouldn't
shell.execute("pass")
shell.get_msg()
while True:
try:
iopub.get_msg(timeout=1)
except Empty:
break
successes = 0
failures = 0
errors = 0
prompt_number = 1
for ws in nb.worksheets:
for cell in ws.cells:
if cell.cell_type != 'code':
continue
try:
outs = run_cell(shell, iopub, cell)
except Exception as e:
print "failed to run cell:", repr(e)
print cell.input
errors += 1
cell.outputs = [e]
continue
failed = False
for out, ref in zip(outs, cell.outputs):
if not compare_outputs(out, ref):
failed = True
if failed:
failures += 1
else:
successes += 1
sys.stdout.write('.')
 
cell.outputs = outs
cell.prompt_number = prompt_number
if cell.outputs:
cell.outputs[0]['prompt_number'] = prompt_number
prompt_number += 1
 
print
print "tested notebook %s" % nb.metadata.name
print " %3i cells successfully replicated" % successes
if failures:
print " %3i cells mismatched output" % failures
if errors:
print " %3i cells failed to complete" % errors
kc.stop_channels()
km.shutdown_kernel()
del km
 
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Run iPython notebook ' +
'non-interactively and save results to new notebook')
parser.add_argument('input_ipynb', action='store',
help='iPython notebook file to run')
parser.add_argument('output_ipynb', action='store',
help='iPython notebook file to save')
 
args = parser.parse_args()
 
with open(args.input_ipynb) as f:
print "testing %s" % args.input_ipynb
nb = reads(f.read(), 'json')
test_notebook(nb)
with io.open(args.output_ipynb, 'w', encoding='utf8') as f:
write(nb, f, 'json')

Forked from https://gist.github.com/minrk/2620735 and changed code to save output to another notebook. Current version appears to have broken the test code, but the output code works.

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.