Skip to content

Instantly share code, notes, and snippets.

  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save anna-geller/5ab8c8e0d9cd65bc197bdb37a510f11e to your computer and use it in GitHub Desktop.
import prefect
from prefect import task, Flow, Parameter, unmapped, Task
from prefect.executors import LocalDaskExecutor
from prefect.engine.results import LocalResult
from prefect.tasks.prefect import (
create_flow_run,
wait_for_flow_run,
get_task_run_result,
)
from uuid import uuid4
@task(
log_stdout=True,
result=LocalResult(
location=str(uuid4())
),
)
def hello_world(user_input: str, age_input: int):
logger = prefect.context.get("logger")
result = f"hello {user_input}!. You're {age_input} today"
logger.info(result)
return result
# This is the flow we can import
with Flow("dummy-flow-to-be-run-multiple-times") as dummy_flow:
user_param = Parameter("user_input", default="world")
age_param = Parameter("age_input", default=18)
hw = hello_world(user_param, age_param)
# This is a flow we use to map the parameters to different runs of the imported flow
# The parameters is a list of dictionaries where each dictionary is the set of input paramaters needed to run imported flow
with Flow("mapped_flows_example", executor=LocalDaskExecutor()) as flow:
parameters = [
dict(user_input="Prefect", age_input=21),
dict(user_input="Marvin", age_input=27),
dict(user_input="World", age_input=12),
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
idempotency_key=str(uuid4()),
)
wait_for_mapped_flows = wait_for_flow_run.map(
mapped_flows, raise_final_state=unmapped(True), stream_logs=unmapped(True)
)
flow_result = get_task_run_result.map(
flow_run_id=mapped_flows,
task_slug=unmapped("hello_world-1"),
upstream_tasks=[wait_for_mapped_flows],
)
if __name__ == "__main__":
dummy_flow.register("community")
flow.register("community")
# from prefect.backend import FlowRunView
#
# flow_run = FlowRunView.from_flow_run_id(
# "b9b030de-e731-4c76-9464-3d450db296bf"
# )
# task_run = flow_run.get_task_run(task_slug='hello_world-1')
# x = task_run.get_result()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment