Skip to content

Instantly share code, notes, and snippets.

@gitanshu
Forked from ryanjulian/hello_prefect.py
Last active April 1, 2022 12:46
Show Gist options
  • Save gitanshu/b0335b5dc1f9d3ed47c7a76cb5b80e57 to your computer and use it in GitHub Desktop.
Save gitanshu/b0335b5dc1f9d3ed47c7a76cb5b80e57 to your computer and use it in GitHub Desktop.
Prefect + dask.distributed Quickstart
import time
from prefect import task, Flow
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
from prefect.utilities.logging import get_logger
logger = get_logger('dask-example')
executor = DaskExecutor(address='tcp://localhost:8786')
environment = LocalEnvironment(executor=executor)
@task(tags=['dask-resource:TOKEN=1'])
def sleepy_hello(t):
logger.info(f'Hello, Prefect! ({t})')
logger.info(f'Sleeping {t} seconds...')
time.sleep(t)
logger.info(f'Done! ({t})')
with Flow('dask-example', environment=environment) as flow:
for j in range(60):
sleepy_hello(60 + j)

Prefect + dask.distributed Quickstart

Installation

In a new virtual environment:

python -m virtualenv hello-prefect
source hello-prefect/bin/activate
# we are using distributed==2.20.0 since newer versions depend on version of cloudpickle>=1.5 which is incompatible with garage for now
pip install dask distributed==2.20.0 bokeh jupyter-server-proxy prefect
prefect backend server

Bring up dask cluster and Prefect server

NOTE: this step requires Docker + docker-compose for Prefect

Each step below to be run in a new terminal session:

Start a dask distributed cluster

Start the dask scheduler

source hello-prefect/bin/activate
dask-scheduler  # listens on localhost:8786

Open a browser to http://localhost:8787 for the Dask Scheduler UI

Start dask worker 1

source hello-prefect/bin/activate
dask-worker <scheduler IP addr>:8786 --resources "TOKEN=1"

Start dask worker 2

source hello-prefect/bin/activate
dask-worker <scheduler IP addr>:8786 --resources "TOKEN=2"

Start prefect

prefect server

source hello-prefect/bin/activate
prefect server start

Open a browser to http://localhost:8080 for the Prefect UI

prefect agent

source hello-prefect/bin/activate
prefect agent start

Register your job with prefect

source hello-prefect/bin/activate
# Create a new project, let's call it Dask
prefect create project "Dask"
prefect register flow --file hello_prefect.py --project "Dask"

Your workflow should now show up in the web UI you launched earlier. Alternativlew, you can open a browser to the "Flow: http://localhost/8080/flow/..." outputted by prefect.

Repeated registrations create a new sequential version in the DB.

Run your job with prefect

From the Web UI, click "Quick Run" on your Flow record. This will schedule 60 long-running tasks onto the Dask Distributed cluster.

Check out your job

Console logs

You should see some console logs from your tasks in the terminal windows for your dask workers.

dask.distributed dashboard

Look at http://localhost:8787.

You will see a Gantt chart of the tasks being executed, and some bar charts representing queued tasks. Clicking on a task's block in the Gantt chart will show you a sampled profile of its execution.

On the "Info" tab you can click through to dashboards for individual workers. These will show you resource accounting and task status on the worker, among other stats.

Prefect UI

Look at http://localhost:8080.

The "Logs" tab of prefect will show you streaming log output from each of your tasks. It will also show you a graph representation of your workflow and a Gantt chart of the tasks, similar to dask.

Each registration creates a new version of the same (work)flow. Each run of that (flow, version) creates a unique run record, i.e. runs are unique keys of the form (flow, version, run_id).

Verifying resource accounting is working

You'll notice that one worker only executes 1 task, and another only executes 2 tasks at once, even though they each have nproc threads. That's because we limited resources using TOKEN=1. Remove the dask-resource tags and re-register/run the job, and each worker will now run nproc jobs in parallel.

More info on dask-workers and resources

Resources

Resources are completely arbitary and abstract. That is, you could replace TOKEN with GPU or CPU or CHICKEN. A resource is just a labeled quanitity which tasks can declare they require exclusively. A task's resource claims decrement available resources on the worker process, and the worker process will not schedule any task which would allow its resource counters to go negative.

IMPORTANT: Resource declarations apply PER PROCESS regardless of invocation That means the following two invocations are equivalent for resource purposes.

One worker, two processes:

dask-worker localhost:8786 --nprocs 2 --resources TOKEN=1

Two workers, one process each:

dask-worker localhost:8786 --resources TOKEN=1 &  # default --nprocs 1
dask-worker localhost:8786 --resources TOKEN=1 &

Dask worker processes, threads, and tasks

dask-worker by default launches with 1 process and as many threads as your processor has hyperthreads. A task consumes a single thread. The number of processes and threads-per-process can be controlled by the --nprocs and --nthreads flags respectively.

Garage is a multi-threaded multi-process application, so we would want to ensure that all experiments run in their own process, and that each of those processes has 2 worker threads (1 for garage and 1 for servicing the dask API). To avoid overloading CPUs, GPUs, and memory, we would have to account for that on a per-experiment basis.

Launch worker:

NCPU="$(nproc)"
NMEM="$(awk '/MemFree/ { printf $2 }' /proc/meminfo)"

dask-worker localhost:8786 \
  --nprocs "${NCPU}" \
  --nthreads 2 \
  --resources "PROCESS=${NCPU},CPU=${NCPU},MEMORY=${NMEM}"

Resource tags:

@prefect.task(tags=['dask-resource:PROCESS=1'])
@garage.wrap_experiment
def my_experiment(ctxt, t):
    ...

Strategies for GPUs

In the case of GPUs, assuming we want GPU exclusivity and locality, we could start 1 worker per GPU instead and equally-divide the processes. We can use CUDA_VISIBLE_DEVICES to enforce exclusivity.

If you don't want exclusivity, dask will happily schedule fractional GPUs.

Experiment-exclusive GPUs, with fixed CPU and memory allocations per-worker.

Launch worker:

NCPU="$(nproc)"
NGPU=4
NCPU_PER_GPU="$(( $NCPU / $NGPU ))"
NMEM_PER_GPU="$(( $NMEM / $NGPU ))"

for i in {0..3}; do
  CUDA_VISIBLE_DEVICES=i dask-worker localhost:8786 \
    --nprocs "${NCPU_PER_GPU}" \
    --nthreads 2 \
    --resources "PROCESS=${NCPU},GPU=1,CPU=${NCPU_PER_GPU},MEMORY=${NMEM_PER_GPU}" &
done

Resource tags:

# 1 GPU, ~10GB RAM, 18 CPU threads
@prefect.task(tags=[
    'dask-resource:PROCESS=1',
    'dask-resource:MEMORY=10e9',
    'dask-resource:GPU=1',
    'dask-resource:CPU=18',
])
@garage.wrap_experiment
def my_experiment(ctxt, t):
    ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment