Skip to content

Instantly share code, notes, and snippets.

@tuulos
Created May 10, 2024 00:19
Show Gist options
  • Save tuulos/000f9b691731dc050c6fb72168aa8907 to your computer and use it in GitHub Desktop.
Save tuulos/000f9b691731dc050c6fb72168aa8907 to your computer and use it in GitHub Desktop.
from metaflow import metaflow_runner
from tempfile import NamedTemporaryFile
import ast
# fixme - it runner shouldn't require nest_asyncio in notebooks
import nest_asyncio
nest_asyncio.apply()
def get_cell():
from IPython import get_ipython
ipython = get_ipython()
if ipython:
return ipython.history_manager.input_hist_raw[-1]
def format_flowfile(cell):
flowspec = [x for x in ast.parse(cell).body
if isinstance(x, ast.ClassDef) and any(b.id == 'FlowSpec' for b in x.bases)]
if not flowspec:
# fixme - nicer exception
raise Exception("No FlowSpec found in the cell")
lines = cell.splitlines()[:flowspec[0].end_lineno]
lines += ["if __name__ == '__main__':", f" {flowspec[0].name}()"]
return '\n'.join(lines)
# fixme - we can have a more versatile API - this is just a quick demo
def nbrun(flow, **kwargs):
cell = get_cell()
if cell:
# fixme - better directory management
import os
os.chdir('/tmp')
with NamedTemporaryFile(prefix=flow.__name__, suffix='.py', mode='w', delete=False) as tmp:
tmp.write(format_flowfile(cell))
tmp.flush()
with metaflow_runner.Runner(tmp.name, env={'JPY_PARENT_PID': ''}).run(**kwargs) as run:
# fixme - streaming logs
print(run.stdout)
return run.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment