Skip to content

Instantly share code, notes, and snippets.

@wilson
Created August 20, 2021 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wilson/eb31dffd2c6526afd30cf13265eb67ab to your computer and use it in GitHub Desktop.
Save wilson/eb31dffd2c6526afd30cf13265eb67ab to your computer and use it in GitHub Desktop.
import prefect
import random
from prefect import task, Flow, Parameter
from prefect.executors import DaskExecutor
from dask_kubernetes import KubeCluster, make_pod_spec
from typing import Iterable
@task
def extract(count: int) -> Iterable[int]:
fake_data = range(100000000)
return random.sample(fake_data, k=count)
@task
def transform(records: Iterable[int]) -> Iterable[int]:
return [n + 1 for n in records]
@task
def load(data: Iterable[int]) -> bool:
logger = prefect.context.get("logger")
logger.info(f"Pretending to store {data}")
return True
with Flow("increment a random sample") as f:
records = extract(Parameter("count", default=100))
data = transform(records)
load(data)
pod_spec = make_pod_spec(image='daskdev/dask:latest',
memory_limit='4G', memory_request='4G',
cpu_limit=1, cpu_request=1,
env={'EXTRA_PIP_PACKAGES': 'prefect'})
executor = DaskExecutor(
cluster_class=KubeCluster,
cluster_kwargs=dict(
namespace="prefect",
pod_template=pod_spec,
n_workers=1,
)
)
f.run(count=100, executor=executor)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment