|
from contextlib import contextmanager |
|
from nbconvert.preprocessors.execute import ( |
|
ExecutePreprocessor, |
|
CellExecutionComplete, |
|
Empty, |
|
) |
|
from panflute import CodeBlock, convert_text, Div |
|
from panflute.tools import meta2builtin |
|
import yaml |
|
|
|
|
|
class SourceExecuter(ExecutePreprocessor): |
|
"""This is a first stab at an executor that runs directly on source code.""" |
|
|
|
@contextmanager |
|
def setup_preprocessor(self): |
|
self._display_id_map = {} |
|
self.widget_state = {} |
|
self.widget_buffers = {} |
|
self.km, self.kc = self.start_new_kernel(cwd=None) |
|
try: |
|
yield self.km, self.kc |
|
finally: |
|
self.kc.stop_channels() |
|
self.km.shutdown_kernel(now=self.shutdown_kernel == "immediate") |
|
delattr(self, "km") |
|
delattr(self, "kc") |
|
|
|
def run_cell(self, source, cell_index=None): |
|
parent_msg_id = self.kc.execute(source) |
|
self.log.debug("Executing cell:\n%s", source) |
|
exec_reply = self._wait_for_reply(parent_msg_id) |
|
outputs = [] |
|
self.clear_before_next_output = False |
|
|
|
while True: |
|
try: |
|
msg = self.kc.iopub_channel.get_msg(timeout=self.iopub_timeout) |
|
except Empty: |
|
self.log.warning("Timeout waiting for IOPub output") |
|
if self.raise_on_iopub_timeout: |
|
raise RuntimeError("Timeout waiting for IOPub output") |
|
else: |
|
break |
|
if msg["parent_header"].get("msg_id") != parent_msg_id: |
|
# not an output from our execution |
|
continue |
|
# Will raise CellExecutionComplete when completed |
|
try: |
|
self.process_message(msg, outputs, cell_index) |
|
except CellExecutionComplete: |
|
break |
|
|
|
return exec_reply, outputs |
|
|
|
def process_message(self, msg, outputs, cell_index): |
|
msg_type = msg["msg_type"] |
|
self.log.debug("msg_type: %s", msg_type) |
|
content = msg["content"] |
|
self.log.debug("content: %s", content) |
|
display_id = content.get("transient", {}).get("display_id", None) |
|
if display_id and msg_type in { |
|
"execute_result", |
|
"display_data", |
|
"update_display_data", |
|
}: |
|
self._update_display_id(display_id, msg) |
|
if msg_type == "status": |
|
if content["execution_state"] == "idle": |
|
raise CellExecutionComplete() |
|
elif msg_type.startswith("comm"): |
|
self.handle_comm_msg(outputs, msg, cell_index) |
|
# Check for remaining messages we don't process |
|
elif msg_type not in ["execute_input", "update_display_data"]: |
|
# Assign output as our processed "result" |
|
return self.output(outputs, msg, display_id, cell_index) |
|
|
|
|
|
def insert_missing_meta(content, doc_level_metadata): |
|
missing_meta = CodeBlock( |
|
yaml.safe_dump(dict(doc_level_metadata.get("imd", {}).get("global_meta", {}))), |
|
classes=["yaml", "metadata"], |
|
) |
|
content.insert(0, missing_meta) |
|
|
|
|
|
with open("test.md") as handle: |
|
doc = convert_text(handle.read(), input_format="markdown", standalone=True) |
|
|
|
doc_metadata = meta2builtin(doc.metadata) |
|
doc_content = [] |
|
cell_content = [] |
|
found_metadata = False |
|
cell_number = 1 |
|
|
|
|
|
executer = SourceExecuter( |
|
kernel_name=doc_metadata.get("kernelspec", {}).get("name", "python") |
|
) |
|
|
|
with executer.setup_preprocessor(): |
|
|
|
for element in doc.content: |
|
if isinstance(element, CodeBlock) and "yaml" in element.classes: |
|
if not found_metadata: |
|
# add missing metadata element to previous cell |
|
insert_missing_meta(cell_content, doc_metadata) |
|
# save previous cell |
|
doc_content.append( |
|
Div( |
|
*cell_content, |
|
attributes={"cell-number": str(cell_number)}, |
|
classes=["nb-cell", "markdown"] |
|
) |
|
) |
|
# add global metadata to code metadata |
|
cell_meta = yaml.safe_load(element.text) |
|
cell_meta.update(doc_metadata.get("imd", {}).get("global_meta", {})) |
|
element.text = yaml.safe_dump(cell_meta) |
|
# start a new cell |
|
cell_number += 1 |
|
cell_content = [element] |
|
found_metadata = True |
|
elif isinstance(element, CodeBlock): |
|
# handle if a code cell, with no metadata, is before a markdown cell, |
|
# also with no metadata |
|
if cell_content and not found_metadata or len(cell_content) > 1: |
|
if not found_metadata: |
|
insert_missing_meta(cell_content, doc_metadata) |
|
doc_content.append( |
|
Div( |
|
*cell_content, |
|
attributes={"cell-number": str(cell_number)}, |
|
classes=["nb-cell", "markdown"] |
|
) |
|
) |
|
cell_number += 1 |
|
cell_content = [] |
|
insert_missing_meta(cell_content, doc_metadata) |
|
elif not found_metadata: |
|
insert_missing_meta(cell_content, doc_metadata) |
|
cell_content.append(element) |
|
# run cell, and add outputs to document |
|
if "python" in element.classes: |
|
exec_reply, outputs = executer.run_cell(element.text, cell_number) |
|
cell_content.append( |
|
CodeBlock(str(outputs), classes=["json", "outputs"]) |
|
) |
|
doc_content.append( |
|
Div( |
|
*cell_content, |
|
attributes={"cell-number": str(cell_number)}, |
|
classes=["nb-cell", "code"] |
|
) |
|
) |
|
# start a new cell |
|
cell_number += 1 |
|
cell_content = [] |
|
found_metadata = False |
|
else: |
|
cell_content.append(element) |
|
# TODO deal with any remaining cell_content |
|
doc.content = doc_content |
|
|
|
with open("test_step1.md", "w") as handle: |
|
handle.write( |
|
convert_text( |
|
doc, input_format="panflute", output_format="markdown", standalone=True |
|
) |
|
) |