Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/0c922ac90e43a0581b9ec43079cdb5c2 to your computer and use it in GitHub Desktop.
Save anna-geller/0c922ac90e43a0581b9ec43079cdb5c2 to your computer and use it in GitHub Desktop.
import prefect
from prefect import task, Flow, Parameter, unmapped, Task
from prefect.executors import LocalDaskExecutor
from prefect.engine.results import LocalResult
from prefect.tasks.prefect import (
create_flow_run,
wait_for_flow_run,
get_task_run_result,
)
from uuid import uuid4
@task(
log_stdout=True,
result=LocalResult(
location="{flow_run_id}_{task_name}_{task_run_id}.pickle"
),
)
def hello_world(user_input: str, age_input: int):
logger = prefect.context.get("logger")
result = f"hello {user_input}!. You're {age_input} today"
logger.info(result)
return result
# This is the flow we can import
with Flow("dummy-flow-to-be-run-multiple-times") as dummy_flow:
user_param = Parameter("user_input", default="world")
age_param = Parameter("age_input", default=18)
hw = hello_world(user_param, age_param)
# This is a flow we use to map the parameters to different runs of the imported flow
# The parameters is a list of dictionaries where each dictionary is the set of input paramaters needed to run imported flow
with Flow("mapped_flows_example", executor=LocalDaskExecutor()) as flow:
parameters = [
dict(user_input="Prefect", age_input=21),
dict(user_input="Marvin", age_input=27),
dict(user_input="World", age_input=12),
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
idempotency_key=str(uuid4()),
)
wait_for_mapped_flows = wait_for_flow_run.map(
mapped_flows, raise_final_state=unmapped(True), stream_logs=unmapped(True)
)
flow_result = get_task_run_result.map(
flow_run_id=mapped_flows,
task_slug=unmapped("hello_world-1"),
upstream_tasks=[wait_for_mapped_flows],
)
if __name__ == "__main__":
dummy_flow.register("community")
flow.register("community")
# from prefect.backend import FlowRunView
#
# flow_run = FlowRunView.from_flow_run_id(
# "b9b030de-e731-4c76-9464-3d450db296bf"
# )
# task_run = flow_run.get_task_run(task_slug='hello_world-1')
# x = task_run.get_result()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment