Skip to content

Instantly share code, notes, and snippets.

@wilson
Created August 27, 2021 18:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wilson/b28fd38c4619e6056e88ab1aed5d79ef to your computer and use it in GitHub Desktop.
Save wilson/b28fd38c4619e6056e88ab1aed5d79ef to your computer and use it in GitHub Desktop.
import prefect
import random
from dask_kubernetes import KubeCluster, make_pod_spec
from prefect import task, Flow, Parameter
from prefect.client import Secret
from prefect.executors import DaskExecutor
from prefect.storage.s3 import S3
from prefect.run_configs import KubernetesRun
from typing import Iterable
@task
def extract(count: int) -> Iterable[int]:
fake_data = range(100000000)
return random.sample(fake_data, k=count)
@task
def transform(records: Iterable[int]) -> Iterable[int]:
return [n + 1 for n in records]
@task
def load(data: Iterable[int]) -> bool:
logger = prefect.context.get("logger")
logger.info(f"Pretending to store {data}")
return True
pod_spec = make_pod_spec(
image="daskdev/dask:latest",
memory_limit="4G",
memory_request="4G",
cpu_limit=1,
cpu_request=1,
env={"EXTRA_PIP_PACKAGES": "prefect dask-kubernetes"},
)
executor = DaskExecutor(
cluster_class=KubeCluster,
cluster_kwargs=dict(
namespace="prefect",
pod_template=pod_spec,
n_workers=1,
),
)
with Flow("increment a random sample") as f:
records = extract(Parameter("count", default=100))
data = transform(records)
load(data)
boto_options = {
"aws_access_key_id": Secret("aws_key").get(),
"aws_secret_access_key": Secret("aws_secret").get(),
}
f.storage = S3(
"addepar-research-data",
key="prefect/flows/demo/random-sample-wlb",
stored_as_script=True,
local_script_path="./demo_flow.py",
client_options=boto_options,
)
f.run_config = KubernetesRun(
image="daskdev/dask:latest", env={"EXTRA_PIP_PACKAGES": "prefect dask-kubernetes"}
)
f.executor = executor
f.register("demo")
f.run(count=100)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment