Skip to content

Instantly share code, notes, and snippets.

@Ogaday
Last active August 7, 2022 13:33
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ogaday/53e00fd89f5985a3633888216d91a6f6 to your computer and use it in GitHub Desktop.
Save Ogaday/53e00fd89f5985a3633888216d91a6f6 to your computer and use it in GitHub Desktop.
Prefect on Dask on Kubernetes

Prefect on Dask on K8s

Intro

A "Hello, world!" Prefect flow running on an ephemeral Dask Cluster on Kubernetes.

The Prefect Core docs suggest running flows on a Dask cluster via Dask Distributed and an article from the MET Office Informatics Lab demonstrates running an adaptive Dask cluster on k8s. This example is inspired by those sources, as well as the respective docs for the technologies used.

Running Instructions

Start a pod on your k8s cluster on which to run your flow. (I use microk8s)

kubectl run --generator run-pod/v1 my-prefect --env "EXTRA_PIP_PACKAGES=prefect dask-kubernetes" --rm -it --image daskdev/dask -- bash

Copy the two files in this gist (flow.py & worker-spec.yml) into the pod, and run the flow on the pod:

python -m flow

This will start up an adaptive Dask Cluster on your k8s cluster, and run the Prefect flow on it forever.

You can expose port 8787 on your Dask pod in order to get the Dask scheduler dashboard. Here I use a NodePort, but there are other options:

kubectl expose pods/my-prefect --port 8787 --type NodePort

Then you can view the services on your cluster, and pick out the one you just created:

kubectl get svc

will give you something like this:

NAME         TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
my-prefect   NodePort    10.152.183.79    <none>        8787:32588/TCP   11s

In this case, once the flow is running, I can navigate to localhost:32588 to see the Dask dashboard.

Additionally, you can run the following to see the worker pods being created and destroyed by the Dask scheduler:

watch kubectl get pods

Notes

This is all very ad hoc and interactive in order to demonstrate the process from start to finish. In the real world, this whole process would be automated with a few steps:

  • Creating a our own image bundled with the correct requirements and snippets in this gist, instead of using the presupplied daskdev/dask image (which installs packages afresh on each worker creation?)
  • Creating a deployment with a replicaset in yaml and applying that to our cluster via gitops instead of directly running a pod with a generator
  • Defining the NodePort/LoadBalancer in yaml and applying that via gitops instead of using kubectl

However I think this is a useful minimal working example to demonstrate the concept of using Dask on Kubernetes as a scheduling backend for Prefect.

"""
Simple Prefect flow with map and reduce conventions.
This module also initialises an adaptive dask cluster which executes the flow.
"""
from datetime import datetime, timedelta
from time import sleep
from typing import List
from dask_kubernetes import KubeCluster
from dask.distributed import Client
from prefect import Flow, task, unmapped
from prefect.engine.executors import DaskExecutor
from prefect.schedules import IntervalSchedule
@task
def multiply(a: int, b: int):
# Sleep to make the work appear more expensive:
sleep(1)
return a * b
@task
def aggregate(v: List[int]):
return sum(v)
# Define the Prefect flow and a schedule:
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=1),
interval=timedelta(minutes=1),
)
with Flow('dask-example', schedule=schedule) as flow:
data = multiply.map(a=range(100), b=unmapped(2))
total = aggregate(data)
# Spin up the cluster and actually run the flow:
with KubeCluster.from_yaml('worker-spec.yml') as cluster:
cluster.adapt(minimum=1, maximum=4)
# Client initialisation unnecessary as Prefect handles it internally:
with Client(cluster) as client:
client.get_versions(check=True)
executor = DaskExecutor(address=cluster.scheduler_address)
flow.run(executor=executor)
kind: Pod
metadata:
labels:
foo: dask-worker
spec:
restartPolicy: Never
containers:
- image: daskdev/dask:latest
imagePullPolicy: IfNotPresent
args: [dask-worker, --nthreads, '2', --no-bokeh, --memory-limit, 6GB, --death-timeout, '60']
name: dask
env:
- name: EXTRA_PIP_PACKAGES
value: prefect dask-kubernetes git+https://github.com/dask/distributed
resources:
limits:
cpu: "2"
memory: 6G
requests:
cpu: "2"
memory: 6G
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment