Skip to content

Instantly share code, notes, and snippets.

@angus-g
Created May 9, 2023 01:03
Show Gist options
  • Save angus-g/c0f90296b9f54efbc5c83b31840cf512 to your computer and use it in GitHub Desktop.
Save angus-g/c0f90296b9f54efbc5c83b31840cf512 to your computer and use it in GitHub Desktop.
Dask tutorial notes

Dask

Tutorial

A brief example

Set up a synthetic example, loading some chunked data and exporting it into a Dask array. With this, we can see the task graph.

Terminology and how Dask works under the hood

Dask provides multi-core and distributed parallel execution on larger-than memory datasets. [0]

Thinking about the second half of this first, Dask has a suite of different collection types: Array, Bag, DataFrame that mimic standard, in-memory collections. We most often encounter the Array, which implements numpy's ndarray interface, chunking up operations to run on smaller sub-arrays. xarray will wrap this underlying object with some extra metadata to keep track of named dimensions, etc.

A parallel-aware collection like a dask Array is no good without some way to actually process it in parallel. This is where a lot of the more arcane and "computer sciency" terminology stems from. According to the Dask tutorial:

Dask provides dynamic task schedulers that execute task graphs in parallel.

Again, working from the second half backwards, let's talk about task graphs [1]. We saw an example of these in the brief example. When we perform any operations on a Dask collection, they essentially get collected into a "recipe", which gives a series of steps for taking the input data (a series of chunks), and turning that into the output, which could be a series of transformed chunks, or a reduced subset of the original data.

Importantly, how we chunk our data has a big impact on the task graph: bigger chunks means fewer tasks that are processing more data at once; and vice versa. But the relationship is not necessarily linear! With more chunks, there may be more intermediate "aggregation" tasks needed to combine all their results together.

Once we have our input collection transformed into an interesting result through a task graph, it is the scheduler's job to manage the tasks. A given task encapsulates its dependencies (data from disk, or a previous task) and a computation (load from disk, add, etc.) [2]. At its disposal, the scheduler has a set of workers, which wait to be given tasks and execute them. This is a very complicated process! The scheduler wants to choose workers that hold as many of the dependencies as possible, and prefers to minimise communication between workers [3]

Schedulers

There are many ways to consume the task graph, and Dask provides a few different schedulers for this job. By default, if we don't explicitly start Client, Dask will use its threaded scheduler for arrays [4]. Threads are computational units (i.e. roughly mapping to CPU cores) that can share memory. There is only a small per-task overhead, and communicating data between threads is free. However, due to the way Python works, threads can only run concurrently when they're running non-Python code (this is due to the Global Interpreter Lock -- the GIL), such as loading data from disk or inside numpy operations.

Usually however, we reach out to Dask's distributed scheduler. The shorthand Client() starts a Dask client (for communicating between Dask and the scheduler itself), and connects that to a new LocalCluster, which is the actual scheduler, wrangling its own herd of workers.

Workers

The next step down the chain are the workers. The scheduler keeps track of the workers: whether they appear or disappear (dynamic scaling); their current workload; and the data they're currently holding. A worker itself is responsible for

  1. computing tasks, as directed by the scheduler;
  2. storing and serving results to other workers or clients

Workers manage their own pool of threads for performing the actual computation, like the threaded scheduler we saw before. As such, they have the same restrictions with regard to the GIL. Workers are also not particularly sacred: they are monitored by the "nanny" process, which will restart them if their memory usage crosses a given fraction of available memory.

By default, workers use a few heuristics to try to stay under their memory limit:

  • ideally, they stay below 60% of their limit
  • above an estimated 60%, spill the least recently used data to disk
  • above a reported 70%, spill to disk (accounts for extra memory usage)
  • above 80%, stop accepting new work
  • above 95%, terminate and restart

The disk I/O associated with disk spills shows up as orange blocks in the task stream. The reported memory usage comes from the operating system, and gets around under-counted memory usage within Python, due to things like custom objects, or data accumulated in buffers.

Topics

  • how to interpret the dashboard

  • how to obtain information from the task graphs

  • optimise chunking to avoid memory errors but still be efficient

  • diagnosing I/O bottlenecks

  • disk vs. dask chunking

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment