Created
March 3, 2022 23:45
-
-
Save anna-geller/0c922ac90e43a0581b9ec43079cdb5c2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import prefect | |
from prefect import task, Flow, Parameter, unmapped, Task | |
from prefect.executors import LocalDaskExecutor | |
from prefect.engine.results import LocalResult | |
from prefect.tasks.prefect import ( | |
create_flow_run, | |
wait_for_flow_run, | |
get_task_run_result, | |
) | |
from uuid import uuid4 | |
@task( | |
log_stdout=True, | |
result=LocalResult( | |
location="{flow_run_id}_{task_name}_{task_run_id}.pickle" | |
), | |
) | |
def hello_world(user_input: str, age_input: int): | |
logger = prefect.context.get("logger") | |
result = f"hello {user_input}!. You're {age_input} today" | |
logger.info(result) | |
return result | |
# This is the flow we can import | |
with Flow("dummy-flow-to-be-run-multiple-times") as dummy_flow: | |
user_param = Parameter("user_input", default="world") | |
age_param = Parameter("age_input", default=18) | |
hw = hello_world(user_param, age_param) | |
# This is a flow we use to map the parameters to different runs of the imported flow | |
# The parameters is a list of dictionaries where each dictionary is the set of input paramaters needed to run imported flow | |
with Flow("mapped_flows_example", executor=LocalDaskExecutor()) as flow: | |
parameters = [ | |
dict(user_input="Prefect", age_input=21), | |
dict(user_input="Marvin", age_input=27), | |
dict(user_input="World", age_input=12), | |
] | |
mapped_flows = create_flow_run.map( | |
parameters=parameters, | |
flow_name=unmapped("dummy-flow-to-be-run-multiple-times"), | |
idempotency_key=str(uuid4()), | |
) | |
wait_for_mapped_flows = wait_for_flow_run.map( | |
mapped_flows, raise_final_state=unmapped(True), stream_logs=unmapped(True) | |
) | |
flow_result = get_task_run_result.map( | |
flow_run_id=mapped_flows, | |
task_slug=unmapped("hello_world-1"), | |
upstream_tasks=[wait_for_mapped_flows], | |
) | |
if __name__ == "__main__": | |
dummy_flow.register("community") | |
flow.register("community") | |
# from prefect.backend import FlowRunView | |
# | |
# flow_run = FlowRunView.from_flow_run_id( | |
# "b9b030de-e731-4c76-9464-3d450db296bf" | |
# ) | |
# task_run = flow_run.get_task_run(task_slug='hello_world-1') | |
# x = task_run.get_result() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment