Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created November 30, 2022 01:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/be068a3b3ce38c817754e24ff096091c to your computer and use it in GitHub Desktop.
Save anna-geller/be068a3b3ce38c817754e24ff096091c to your computer and use it in GitHub Desktop.
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
# Define some tasks for us to run in our flow
@task
def extract() -> list:
logger = get_run_logger()
logger.info("extract")
return [1, 2, 3, 4, 5, 6]
@task
def transform(number: int) -> int:
logger = get_run_logger()
logger.info(f"transform {number}")
return number * 2
@task
def load(numbers: list) -> list:
logger = get_run_logger()
logger.info("load")
return [i for i in numbers if i]
# address="tcp://daskcluster-scheduler:8786",
@flow(task_runner=DaskTaskRunner())
def dask_flow_map():
numbers = extract()
transformed_numbers = transform.map(numbers)
numbers_twice = transform.map(transformed_numbers)
result = load(numbers=numbers_twice)
if __name__ == "__main__":
dask_flow_map()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment