Skip to content

Instantly share code, notes, and snippets.

@zzstoatzz
Last active February 6, 2024 10:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zzstoatzz/4e36b635694e09cd11159020051d77b4 to your computer and use it in GitHub Desktop.
Save zzstoatzz/4e36b635694e09cd11159020051d77b4 to your computer and use it in GitHub Desktop.
generics in prefect
from typing import Sequence, TypeVar, Generic
from pydantic import BaseModel, TypeAdapter
from prefect import flow, task
ResultType = TypeVar("ResultType")
class Result(BaseModel, Generic[ResultType]):
data: Sequence[ResultType]
@task
def query() -> Result[str]:
'''provide `str` as the otherwise generic type parameter for `Result`'''
return Result[str](data='1 2 3'.split())
@task
def process_result(raw_result: Result[str]) -> Result[int]:
'''provide `int` as the otherwise generic type parameter for `Result`'''
return TypeAdapter(Result[int]).validate_python(raw_result.model_dump())
@flow(log_prints=True)
def work():
future = query.submit()
processed_result = process_result(future) # type: ignore
print(processed_result)
assert isinstance(processed_result, Result)
assert processed_result.data == [1, 2, 3]
if __name__ == "__main__":
work()
@zzstoatzz
Copy link
Author

zzstoatzz commented Feb 6, 2024

(bleeding-prefect) pad-2 :: testing/prefect-sandbox/testing-prefectmain*›
» pip list | rg 'prefect|pydantic'
prefect                    2.14.20
pydantic                   2.6.1
pydantic_core              2.16.2
pydantic-settings          2.1.0

» python generics.py
01:54:15.299 | INFO    | prefect.engine - Created flow run 'eggplant-smilodon' for flow 'work'
01:54:15.838 | INFO    | Flow run 'eggplant-smilodon' - Created task run 'query-0' for task 'query'
01:54:15.841 | INFO    | Flow run 'eggplant-smilodon' - Submitted task run 'query-0' for execution.
01:54:16.445 | INFO    | Flow run 'eggplant-smilodon' - Created task run 'process_result-0' for task 'process_result'
01:54:16.445 | INFO    | Flow run 'eggplant-smilodon' - Executing 'process_result-0' immediately...
01:54:16.693 | INFO    | Task run 'query-0' - Finished in state Completed()
01:54:17.033 | INFO    | Task run 'process_result-0' - Finished in state Completed()
01:54:17.050 | INFO    | Flow run 'eggplant-smilodon' - data=[1, 2, 3]
01:54:17.480 | INFO    | Flow run 'eggplant-smilodon' - Finished in state Completed('All states completed.')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment