Created
May 10, 2024 00:19
-
-
Save tuulos/000f9b691731dc050c6fb72168aa8907 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from metaflow import metaflow_runner | |
from tempfile import NamedTemporaryFile | |
import ast | |
# fixme - it runner shouldn't require nest_asyncio in notebooks | |
import nest_asyncio | |
nest_asyncio.apply() | |
def get_cell(): | |
from IPython import get_ipython | |
ipython = get_ipython() | |
if ipython: | |
return ipython.history_manager.input_hist_raw[-1] | |
def format_flowfile(cell): | |
flowspec = [x for x in ast.parse(cell).body | |
if isinstance(x, ast.ClassDef) and any(b.id == 'FlowSpec' for b in x.bases)] | |
if not flowspec: | |
# fixme - nicer exception | |
raise Exception("No FlowSpec found in the cell") | |
lines = cell.splitlines()[:flowspec[0].end_lineno] | |
lines += ["if __name__ == '__main__':", f" {flowspec[0].name}()"] | |
return '\n'.join(lines) | |
# fixme - we can have a more versatile API - this is just a quick demo | |
def nbrun(flow, **kwargs): | |
cell = get_cell() | |
if cell: | |
# fixme - better directory management | |
import os | |
os.chdir('/tmp') | |
with NamedTemporaryFile(prefix=flow.__name__, suffix='.py', mode='w', delete=False) as tmp: | |
tmp.write(format_flowfile(cell)) | |
tmp.flush() | |
with metaflow_runner.Runner(tmp.name, env={'JPY_PARENT_PID': ''}).run(**kwargs) as run: | |
# fixme - streaming logs | |
print(run.stdout) | |
return run.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment