In a new virtual environment:
python -m virtualenv hello-prefect
source hello-prefect/bin/activate
pip install dask distributed bokeh jupyter-server-proxy prefect
prefect backend server
NOTE: this step requires Docker + docker-compose for Prefect
Each step below to be run in a new terminal session:
source hello-prefect/bin/activate
dask-scheduler # listens on localhost:8786
Open a browser to http://localhost:8787 for the Dask Scheduler UI
source hello-prefect/bin/activate
dask-worker localhost:8786 --resources "TOKEN=1"
source hello-prefect/bin/activate
dask-worker localhost:8786 --resources "TOKEN=2"
source hello-prefect/bin/activate
prefect server start
Open a browser to http://localhost:8080 for the Prefect UI
source hello-prefect/bin/activate
prefect agent start
source hello-prefect/bin/activate
prefect register flow --file hello_prefect.py --name dask-example
Your workflow should now show up in the web UI you launched earlier. Alternativlew, you can open a browser to the "Flow: http://localhost/8080/flow/..." outputted by prefect.
Repeated registrations create a new sequential version in the DB.
From the Web UI, click "Quick Run" on your Flow record. This will schedule 60 long-running tasks onto the Dask Distributed cluster.
You should see some console logs from your tasks in the terminal windows for your dask workers.
Look at http://localhost:8787.
You will see a Gantt chart of the tasks being executed, and some bar charts representing queued tasks. Clicking on a task's block in the Gantt chart will show you a sampled profile of its execution.
On the "Info" tab you can click through to dashboards for individual workers. These will show you resource accounting and task status on the worker, among other stats.
Look at http://localhost:8080.
The "Logs" tab of prefect will show you streaming log output from each of your tasks. It will also show you a graph representation of your workflow and a Gantt chart of the tasks, similar to dask.
Each registration creates a new version of the same (work)flow. Each run of that
(flow, version)
creates a unique run record, i.e. runs are unique keys of the
form (flow, version, run_id)
.
You'll notice that one worker only executes 1 task, and another only executes 2
tasks at once, even though they each have nproc
threads. That's because we
limited resources using TOKEN=1
. Remove the dask-resource
tags and
re-register/run the job, and each worker will now run nproc
jobs in parallel.
Resources are completely arbitary and abstract. That is, you could replace TOKEN with GPU or CPU or CHICKEN. A resource is just a labeled quanitity which tasks can declare they require exclusively. A task's resource claims decrement available resources on the worker process, and the worker process will not schedule any task which would allow its resource counters to go negative.
IMPORTANT: Resource declarations apply PER PROCESS regardless of invocation That means the following two invocations are equivalent for resource purposes.
One worker, two processes:
dask-worker localhost:8786 --nproc=2 --resources TOKEN=1
Two workers, one process each:
dask-worker localhost:8786 --resources TOKEN=1 & # default --nproc=1
dask-worker localhost:8786 --resources TOKEN=1 &
dask-worker
by default launches with 1 process and as many threads as your
processor has hyperthreads. A task consumes a single thread. The number of
processes and threads-per-process can be controlled by the --nprocs
and
--nthreads
flags respectively.
Garage is a multi-threaded multi-process application, so we would want to ensure that all experiments run in their own process, and that each of those processes has 2 worker threads (1 for garage and 1 for servicing the dask API). To avoid overloading CPUs, GPUs, and memory, we would have to account for that on a per-experiment basis.
Launch worker:
NCPU="$(nproc)"
NMEM="$(awk '/MemFree/ { printf $2 }' /proc/meminfo)"
dask-worker localhost:8786 \
--nproc="${NCPU}" \
--nthreads 2 \
--resources "PROCESS=${NCPU},CPU=${NCPU},MEMORY=${NMEM}"
Resource tags:
@prefect.task(tags=['dask-resource:PROCESS=1'])
@garage.wrap_experiment
def my_experiment(ctxt, t):
...
In the case of GPUs, assuming we want GPU exclusivity and locality, we could
start 1 worker per GPU instead and equally-divide the processes. We can use
CUDA_VISIBLE_DEVICES
to enforce exclusivity.
If you don't want exclusivity, dask will happily schedule fractional GPUs.
Experiment-exclusive GPUs, with fixed CPU and memory allocations per-worker.
Launch worker:
NCPU="$(nproc)"
NGPU=4
NCPU_PER_GPU="$(( $NCPU / $NGPU ))"
NMEM_PER_GPU="$(( $NMEM / $NGPU ))"
for i in {0..3}; do
CUDA_VISIBLE_DEVICES=i dask-worker localhost:8786 \
--nproc="${NCPU_PER_GPU}" \
--nthreads 2 \
--resources "PROCESS=${NCPU},GPU=1,CPU=${NCPU_PER_GPU},MEMORY=${NMEM_PER_GPU}" &
done
Resource tags:
# 1 GPU, ~10GB RAM, 18 CPU threads
@prefect.task(tags=[
'dask-resource:PROCESS=1',
'dask-resource:MEMORY=10e9',
'dask-resource:GPU=1',
'dask-resource:CPU=18',
])
@garage.wrap_experiment
def my_experiment(ctxt, t):
...