Skip to content

Instantly share code, notes, and snippets.

@emcake
Created July 18, 2020 00:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save emcake/bf53d8484fbb33c7b14bd1940eb9f39c to your computer and use it in GitHub Desktop.
Save emcake/bf53d8484fbb33c7b14bd1940eb9f39c to your computer and use it in GitHub Desktop.
import prefect
from prefect import Flow, task, Task, case, Parameter
from prefect.environments import LocalEnvironment
from prefect.environments.storage import S3
from prefect.tasks.prefect import FlowRunTask
from prefect.tasks.control_flow import merge
from prefect.engine.results import PrefectResult
with Flow('subflow-one') as subflow_one:
x = Parameter('x')
@task(slug='result', result=PrefectResult())
def result(x):
return x ** 2
result(x)
subflow_one.register(project_name='mattk')
with Flow('subflow-two') as subflow_two:
x = Parameter('x')
@task(slug='result', result=PrefectResult())
def result(x):
return x ** 3 + 46
result(x)
subflow_two.register(project_name='mattk')
with Flow('super-flow') as flow:
@task
def format_params(n):
return dict(x=n)
@task
def get_result_from_subflow(flow_run_id):
c = prefect.Client()
info = c.get_flow_run_info(flow_run_id)
res = None
for x in info.task_runs.to_list():
if x.task_slug == 'result' :
x.state.load_result()
if isinstance(x.state._result, PrefectResult) :
res = x.state.result
else:
raise ValueError('expected PrefectResult')
return res
n = Parameter('input', default=1)
group = Parameter('flow_group')
name = Parameter('flow_name')
formatted_parameters = format_params(n)
subflow_run = FlowRunTask(wait=True)(flow_name=name, project_name=group, parameters=formatted_parameters)
result = get_result_from_subflow(subflow_run)
flow.register(project_name='mattk')
@gryBox
Copy link

gryBox commented Jul 18, 2020

This looks really neat. Thank you again

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment