Skip to content

Instantly share code, notes, and snippets.

@jacobtomlinson
Last active July 20, 2022 17:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jacobtomlinson/6f16abb716f50f81a6687bd67efd2f61 to your computer and use it in GitHub Desktop.
Save jacobtomlinson/6f16abb716f50f81a6687bd67efd2f61 to your computer and use it in GitHub Desktop.
Intro to distributed computing on GPUs with Dask in Python
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src='https://docs.dask.org/en/latest/_images/dask_horizontal.svg' width=400>\n",
"\n",
"# Dask natively scales Python\n",
"**Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love**\n",
"\n",
"_Integrates with existing projects_\n",
"\n",
"_BUILT WITH THE BROADER COMMUNITY_\n",
"\n",
"Dask is open source and freely available. It is developed in coordination with other community projects like Numpy, Pandas, and Scikit-Learn.\n",
"\n",
"*(from the Dask project homepage at dask.org)*\n",
"\n",
"* * *\n",
"\n",
"__What Does This Mean?__\n",
"* Built in Python\n",
"* Scales *properly* from single laptops to 1000-node clusters\n",
"* Leverages and interops with existing Python APIs as much as possible\n",
"* Adheres to (Tim Peters') \"Zen of Python\" (https://www.python.org/dev/peps/pep-0020/) ... especially these elements:\n",
" * Explicit is better than implicit.\n",
" * Simple is better than complex.\n",
" * Complex is better than complicated.\n",
" * Readability counts. <i>[ed: that goes for docs, too!]</i>\n",
" * Special cases aren't special enough to break the rules.\n",
" * Although practicality beats purity.\n",
" * In the face of ambiguity, refuse the temptation to guess.\n",
" * If the implementation is hard to explain, it's a bad idea.\n",
" * If the implementation is easy to explain, it may be a good idea.\n",
"* While we're borrowing inspiration, it Dask embodies one of Perl's slogans, making easy things easy and hard things possible\n",
" * Specifically, it supports common data-parallel abstractions like Pandas and Numpy\n",
" * But also allows scheduling arbitary custom computation that doesn't fit a preset mold"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## First a little housekeeping\n",
"\n",
"This cell is intended to capture any boilerplate code that we need to set things up on `app.blazingsql.com`, the intention is to simplify running this notebook as a tutorial. Please run the next cell and then don't worry about it."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask.config\n",
"import getpass\n",
"dask.config.set({\"distributed.dashboard.link\": f\"https://app.blazingsql.com/jupyter/user/{getpass.getuser()}/proxy/{{port}}/status\"})\n",
"!mkdir data\n",
"!cd data && wget https://github.com/jacobtomlinson/dask-video-tutorial-2020/raw/master/data/beer_small.csv\n",
"!cd data && wget https://github.com/jacobtomlinson/dask-video-tutorial-2020/raw/master/data/pageviews_small.csv"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dask Dataframes\n",
"\n",
"Let's start with one common use case for Dask: scaling dataframes to \n",
"* larger datasets (which don't fit in memory) and \n",
"* multiple processes (which could be on multiple nodes)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"from dask_cuda import LocalCUDACluster\n",
"\n",
"cluster = LocalCUDACluster()\n",
"client = Client(cluster)\n",
"\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask_cudf\n",
"\n",
"ddf = dask_cudf.read_csv('data/beer_small.csv', blocksize=12e7)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### What is this Dask Dataframe?\n",
"\n",
"A large, virtual dataframe divided along the index into multiple dataframes. \n",
"\n",
"_As we are working with RAPIDS here we will be using [cuDF](https://docs.rapids.ai/api/cudf/stable/) as our sub-dataframe type instead of the traditional Pandas._\n",
"\n",
"<img src=\"https://docs.dask.org/en/latest/_images/dask-dataframe.svg\" width=\"400px\">"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf.map_partitions(type).compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf[ddf.beer_style.str.contains('IPA')].head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ipa = ddf[ddf.beer_style.str.contains('IPA')]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mean_ipa_review = ipa.groupby('brewery_name').review_overall.agg(['mean','count'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mean_ipa_review.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`compute` doesn't just run the work, it collects the result to a single, regular Pandas dataframe right here in our initial Python VM.\n",
"\n",
"Having a local result is convenient, but if we are generating large results, we may want (or need) to produce output in parallel to the filesystem, instead. \n",
"\n",
"There are writing counterparts to read methods which we can use:\n",
"\n",
"- `read_csv` \\ `to_csv`\n",
"- `read_hdf` \\ `to_hdf`\n",
"- `read_json` \\ `to_json`\n",
"- `read_parquet` \\ `to_parquet`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mean_ipa_review.to_csv('ipa-*.csv') #the * is where the partition number will go"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Another dataframe example"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.restart()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"__Read data__ from the `pageviews_small.csv` file. Use Dask's `blocksize=` parameter to set each partition to max of 100 MB."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask_cudf\n",
"\n",
"ddf = dask_cudf.read_csv('data/pageviews_small.csv', sep=' ', blocksize=10e7)\n",
"\n",
"ddf"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"__Change the column names__ to `project`, `page`, `requests`, and `x` then drop the `x` column."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf.columns = ['project', 'page', 'requests', 'x']\n",
"\n",
"ddf2 = ddf.drop('x', axis=1)\n",
"\n",
"ddf2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"__Filter__ for `project` matching \"en\" (English Wikipedia)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf3 = ddf2[ddf2.project == 'en']\n",
"ddf3"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"__Count__ how many pages were accessed from English Wikipedia vs. all projects in this dataset. (Note: each project/page combination appears on a unique line, so this amounts to just counting records)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf2.count().compute() #all"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf3.count().compute() #English"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"__Show the record counts__ for English (en), French (fr), Chinese (zh), and Polish (pl)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf4 = ddf2.groupby('project').count().reset_index()\n",
"\n",
"ddf4[ddf4.project.isin(['en', 'fr', 'zh', 'pl'])].compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dask Dashboard\n",
"\n",
"Here we'll revisit that code one more time.\n",
"\n",
"But this time, we'll focus less on getting the answers, and more on seeing what Dask is doing.\n",
"\n",
"Specifically, we'll look at some of the elements of the Dask Dashboard GUI."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.restart()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The dashboard is available at the URL above.\n",
"\n",
"When we __Read data__ you'll notice that nothing happens in the Dask GUI widgets, because these operations are just setting up a compute graph which will be executed later:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask_cudf\n",
"\n",
"ddf = dask_cudf.read_csv('data/pageviews_small.csv', sep=' ', blocksize=10e7)\n",
"\n",
"ddf.columns = ['project', 'page', 'requests', 'x']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When we __Count__ (and `.compute()`) all the records, we'll see tasks get scheduled. __Before__ running this command, note memory, CPU, etc. in the GUI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ddf.drop('x', axis=1).count().compute() #all"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The GUI tells us quite a lot about what's happened. If you really want to see how the computation was decomposed by Dask, you can render a task graph before executing (although you won't normally need to do this):"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Let's look at Dask's Profile View\n",
"\n",
"Note: almost all of Dask's dashboard views update in realtime. The Profile View __does not__. Although Dask is collecting perf data behind the scenes, the profiler timeline doesn't update until you click the \"Update\" button. \n",
"\n",
"At that point you can select a time period from the refreshed timeline, and Dask will render a flame graph from that selected period."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dask Array\n",
"\n",
"Depending on the focus of your work, Dask Array is likely to be the first interface you use for Dask after Dataframe ... or perhaps just the first interface you use (e.g., if you work primarily with NumPy).\n",
"\n",
"Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.\n",
"\n",
"Dask arrays coordinate many NumPy arrays arranged into a grid. These NumPy arrays may live on disk or on other machines.\n",
"\n",
"_Again as we are working with RAPIDS here we will be using [CuPy](https://cupy.dev/) as our sub-array type instead of the traditional NumPy._\n",
"\n",
"<img src=\"https://docs.dask.org/en/latest/_images/dask-array-black-text.svg\">"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dask Arrays\n",
"\n",
"- Dask arrays are chunked, n-dimensional arrays\n",
"- Can think of a Dask array as a collection of `ndarray` arrays\n",
"- Dask arrays implement a large subset of the NumPy API using blocked algorithms\n",
"- For many purposes Dask arrays can serve as drop-in replacements for NumPy arrays"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.restart()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import cupy as cp\n",
"import dask.array as da"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a_cp = cp.arange(1, 50, 3)\n",
"a_cp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a_da = da.arange(1, 50, 3, chunks=5, like=cp.empty(0))\n",
"a_da"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(a_da.dtype)\n",
"print(a_da.shape)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(a_da.chunks)\n",
"print(a_da.chunksize)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a_da ** 2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"(a_da ** 2).compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"type((a_da ** 2).compute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask arrays support a large portion of the NumPy interface:\n",
"\n",
"- Arithmetic and scalar mathematics: `+`, `*`, `exp`, `log`, ...\n",
"\n",
"- Reductions along axes: `sum()`, `mean()`, `std()`, `sum(axis=0)`, ...\n",
"\n",
"- Tensor contractions / dot products / matrix multiply: `tensordot`\n",
"\n",
"- Axis reordering / transpose: `transpose`\n",
"\n",
"- Slicing: `x[:100, 500:100:-2]`\n",
"\n",
"- Fancy indexing along single axes with lists or numpy arrays: `x[:, [10, 1, 5]]`\n",
"\n",
"- Array protocols like `__array__` and `__array_ufunc__`\n",
"\n",
"- Some linear algebra: `svd`, `qr`, `solve`, `solve_triangular`, `lstsq`, ...\n",
"\n",
"- ...\n",
"\n",
"See the [Dask array API docs](http://docs.dask.org/en/latest/array-api.html) for full details about what portion of the NumPy API is implemented for Dask arrays."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Blocked Algorithms\n",
"\n",
"Dask arrays are implemented using _blocked algorithms_. These algorithms break up a computation on a large array into many computations on smaller peices of the array. This minimizes the memory load (amount of RAM) of computations and allows for working with larger-than-memory datasets in parallel."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rs = da.random.RandomState(RandomState=cp.random.RandomState)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"x = rs.random(20, chunks=5)\n",
"x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"result = x.sum()\n",
"result"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"result.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask supports a large portion of the NumPy API. This can be used to build up more complex computations using the familiar NumPy operations you're used to."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"x = rs.random(size=(15, 15), chunks=(10, 5))\n",
"x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"result = (x + x.T).sum()\n",
"result"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"result.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Diving deeper with Dask Delayed\n",
"\n",
"*More detailed docs are online at:*\n",
"* https://docs.dask.org/en/latest/delayed.html\n",
"* https://docs.dask.org/en/latest/futures.html\n",
"\n",
"Sometimes problems don’t fit nicely into one of the high-level collections like Dask arrays or Dask DataFrames. In these cases, you can parallelize custom algorithms using the Dask `delayed` interface. This allows one to manually create task graphs with a light annotation of normal Python code."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"def inc(x):\n",
" time.sleep(0.5)\n",
" return x + 1\n",
"\n",
"def double(x):\n",
" time.sleep(0.5)\n",
" return 2 * x\n",
"\n",
"def add(x, y):\n",
" time.sleep(0.5)\n",
" return x + y"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"data = [1, 2, 3, 4]\n",
"\n",
"output = []\n",
"for x in data:\n",
" a = inc(x)\n",
" b = double(x)\n",
" c = add(a, b)\n",
" output.append(c)\n",
"\n",
"total = sum(output)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask `delayed` wraps function calls and delays their execution. Rather than computing results immediately, `delayed` functions record what we want to compute as a task into a graph that we’ll run later on parallel hardware by calling `compute`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask import delayed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"lazy_inc = delayed(inc)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inc_output = lazy_inc(3) #inc(3)\n",
"inc_output"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# inc_output.visualize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inc_output.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using delayed functions, we can build up a task graph for the particular computation we want to perform"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"double_inc_output = lazy_inc(inc_output)\n",
"double_inc_output"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# double_inc_output.visualize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"double_inc_output.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can use `delayed` to make our previous example computation lazy by wrapping all the function calls with delayed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@delayed\n",
"def inc(x):\n",
" time.sleep(0.5)\n",
" return x + 1\n",
"\n",
"@delayed\n",
"def double(x):\n",
" time.sleep(0.5)\n",
" return 2 * x\n",
"\n",
"@delayed\n",
"def add(x, y):\n",
" time.sleep(0.5)\n",
" return x + y"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now `add` returns a `Delayed` object which you can call `compute()` on at a later time"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"data = [1, 2, 3, 4]\n",
"\n",
"output = []\n",
"for x in data:\n",
" a = inc(x)\n",
" b = double(x)\n",
" c = add(a, b)\n",
" output.append(c)\n",
"\n",
"total = delayed(sum)(output)\n",
"total"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# total.visualize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"total.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Check out the [Dask delayed best practices](http://docs.dask.org/en/latest/delayed-best-practices.html) page to avoid some common problems when using `delayed`. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## `concurrent.futures` interface\n",
"\n",
"The Dask distributed scheduler implements a superset of Python's [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) interface that allows for finer control and asynchronous computation.\n",
"\n",
"The `submit` function sends a function and arguments to the distributed scheduler for processing. They return `Future` objects that refer to remote data on the cluster. The `Future` returns immediately while the computations run remotely in the background. There is no blocking of the local Python session."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import random\n",
"\n",
"def inc(x):\n",
" time.sleep(random.uniform(0, 2))\n",
" return x + 1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"f = client.submit(inc, 7.2) # Submits inc(7.2) to the distributed scheduler\n",
"print(f)\n",
"print(type(f))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Once the computation for the `Future` is complete, you can retrieve the result using the `.result()` method"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(f)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"f.result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `map` function can be used to apply a function on a sequence of arguments (similar to the built-in Python `map` function).\n",
"\n",
"To delete `Futures` in distributed memory, use the `del` keyword"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"del f"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data = range(10)\n",
"futures = client.map(inc, data)\n",
"futures"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here a list of `Futures` are returned, one for each item in the sequence of arguments. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"futures"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"results = client.gather(futures)\n",
"# Same as results = [future.result() for future in futures]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"results"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice what happens if we run the same calculation:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data = range(10)\n",
"futures = client.map(inc, data)\n",
"futures"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The results are ready right away ... and ... the keys are the same. That's because all of the same objects are involved, and the results are still in the cluster memory.\n",
"\n",
"The `concurrent.futures` API even allows you to submit tasks based on the output of other tasks. This gives more flexibility in situations where the computations may evolve over time.\n",
"\n",
"```python\n",
"from dask.distributed import as_completed\n",
"\n",
"seq = as_completed(futures)\n",
"\n",
"for future in seq:\n",
" y = future.result()\n",
" if condition(y):\n",
" new_future = client.submit(...)\n",
" seq.add(new_future) # add back into the loop\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## dask_ml (and cuml)\n",
"\n",
"Building on these primitives Dask-ML provides scalable machine learning in Python using [Dask](https://dask.org/) alongside popular machine learning libraries like [Scikit-Learn](http://scikit-learn.org/).\n",
"\n",
"The idea is to support Pandas + Scikit style ML for parallel scenarios, with code patterns you're used to:\n",
"\n",
"```python\n",
"import dask.dataframe as dd\n",
"df = dd.read_parquet('...')\n",
"data = df[['age', 'income', 'married']]\n",
"labels = df['outcome']\n",
"\n",
"from dask_ml.linear_model import LogisticRegression\n",
"lr = LogisticRegression()\n",
"lr.fit(data, labels)\n",
"```\n",
"\n",
"Modern machine learning algorithms employ a wide variety of techniques. Scaling these requires a similarly wide variety of different approaches. Generally solutions fall into the following three categories:\n",
"\n",
"### Parallelize Scikit-Learn Directly\n",
"\n",
"Scikit-Learn already provides parallel computing on a single machine with [Joblib](http://joblib.readthedocs.io/en/latest/). Dask extends this parallelism to many machines in a cluster. This works well for modest data sizes but large computations, such as random forests, hyper-parameter optimization, and more.\n",
"\n",
"```python\n",
"from dask.distributed import Client\n",
"import joblib\n",
"\n",
"client = Client() # Connect to a Dask Cluster\n",
"\n",
"with joblib.parallel_backend('dask'):\n",
" # Your normal scikit-learn code here\n",
"```\n",
"\n",
"See [Dask-ML Joblib documentation](https://ml.dask.org/joblib.html) for more information.\n",
"\n",
"*Note that this is an active collaboration with the Scikit-Learn development team. This functionality is progressing quickly but is in a state of rapid change.*\n",
"\n",
"### Reimplement Scalable Algorithms with Dask Array\n",
"\n",
"Some machine learning algorithms are easy to write down as Numpy algorithms. In these cases we can replace Numpy arrays with Dask arrays to achieve scalable algorithms easily. This is employed for [linear models](https://ml.dask.org/glm.html), [pre-processing](https://ml.dask.org/preprocessing.html), and [clustering](https://ml.dask.org/clustering.html).\n",
"\n",
"```python\n",
"from dask_ml.preprocessing import Categorizer, DummyEncoder\n",
"from dask_ml.linear_model import LogisticRegression\n",
"\n",
"lr = LogisticRegression()\n",
"lr.fit(data, labels)\n",
"```\n",
"\n",
"### Partner with other distributed libraries\n",
"\n",
"Other machine learning libraries like XGBoost and TensorFlow already have distributed solutions that work quite well. Dask-ML makes no attempt to re-implement these systems. Instead, Dask-ML makes it easy to use normal Dask workflows to prepare and set up data, then it deploys XGBoost or Tensorflow *alongside* Dask, and hands the data over.\n",
"\n",
"```python\n",
"from dask_ml.xgboost import XGBRegressor\n",
"\n",
"est = XGBRegressor(...)\n",
"est.fit(train, train_labels)\n",
"```\n",
"\n",
"See [Dask-ML + XGBoost](https://ml.dask.org/xgboost.html) or [Dask-ML + TensorFlow](https://ml.dask.org/tensorflow.html) documentation for more information.\n",
"\n",
"### cuML\n",
"\n",
"[cuML](https://docs.rapids.ai/api/cuml/stable/) extends work done in Dask and Dask-ML to bring distributed execution to GPU accelerated algorithms.\n",
"\n",
"cuML is a suite of fast, GPU-accelerated machine learning algorithms designed for data science and analytical tasks. With API that mirrors Sklearn’s, and we provide practitioners with the easy fit-predict-transform paradigm without ever having to program on a GPU.\n",
"\n",
"The `cuml.dask` subpackage takes some of the GPU accelerated algorithms and provides [multi-node multi-GPU](https://docs.rapids.ai/api/cuml/stable/api.html#multi-node-multi-gpu-algorithms) implementations which use Dask to scale out over many machines and GPUs."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed\n",
"\n",
"As we covered at the beginning Dask has the ability to run work on mulitple machines using the distributed scheduler.\n",
"\n",
"Until now we have actually been using the distributed scheduler for our work, but just on a single machine.\n",
"\n",
"When we instantiate a `LocalCUDACluster()` object with no arguments it will attempt to locate all GPUs and launch one Dask GPU worker per GPU.\n",
"\n",
"We then pass this cluster object to our `Client()` in order for work to be executed on our cluster.\n",
"\n",
"Let's explore the `LocalCUDACluster` object ourselves and see what it is doing."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"LocalCUDACluster?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Our cluster object has attributes and methods which we can use to access information about our cluster. For instance we can get the log output from the scheduler and all the workers with the `get_logs()` method."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster.get_logs()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can access the url that the Dask dashboard is being hosted at."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster.dashboard_link"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scalable clusters\n",
"\n",
"The `LocalCUDACluster` is a great tool for launching a Dask cluster on a single machine with one or more GPUs. But as your workloads grow you may need to spread to multiple machines. Dask has many subprojects which handle provisioning clusters on various platforms.\n",
"\n",
"We currently have cluster managers for [Kubernetes](https://kubernetes.dask.org/en/latest/), [Hadoop/Yarn](https://yarn.dask.org/en/latest/), [cloud platforms](https://cloudprovider.dask.org/en/latest/) and [batch systems including PBS, SLURM and SGE](http://jobqueue.dask.org/en/latest/).\n",
"\n",
"These cluster managers allow users who have access to resources such as these to bootstrap Dask clusters on to them. If an institution wishes to provide a central service that users can request Dask clusters from there is also [Dask Gateway](https://gateway.dask.org/).\n",
"\n",
"With some cluster managers it is possible to increase and descrease the number of workers either by calling `cluster.scale(n)` in your code where `n` is the desired number of workers. Or you can let Dask do this dynamically by calling `cluster.adapt(minimum=1, maximum=100)` where minimum and maximum are your preferred limits for Dask to abide to.\n",
"\n",
"It is always good to keep your minimum to at least 1 as Dask will start running work on a single worker in order to profile how long things take and extrapolate how many additional workers it thinks it needs. Getting new workers may take time depending on your setup so keeping this at 1 or above means this profilling will start immediately."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Best Practices and Wrapup\n",
"\n",
"The Dask docs collect a number of best practices:\n",
"* Dataframe: https://docs.dask.org/en/latest/dataframe-best-practices.html\n",
"* Array: https://docs.dask.org/en/latest/array-best-practices.html\n",
"* Delayed: https://docs.dask.org/en/latest/delayed-best-practices.html \n",
"* Overall: https://docs.dask.org/en/latest/best-practices.html\n",
"\n",
"### Partitions/Chunks and Tasks\n",
"\n",
"Remember that Dask is a scheduler for regular Python functions operating on (and producing) regular Python objects.\n",
"\n",
"Your partitions, chunks, or data segments should be small enough to comfortably fit in RAM for each worker thread/core.\n",
"\n",
"That is...\n",
"* if you have a 1GB worker with 1 core, want to keep your partitions below 1GB\n",
"* with 2 x 1 GB workers with 1 cores, we still want partitions below 1GB\n",
"* with n x 4 GB workers with 2 cores per worker, we want partitions below 2 GB\n",
"\n",
"It's also good to take into account that more memory may be used for operations than the data chunk size itself, and that it's helpful to have a few chunks of data available to keep Dask's worker cores busy. \n",
"\n",
"So we might want to take those numbers above and make them 2-4x smaller (or, equivalently, create 2-4x as many partitions).\n",
"\n",
"Generally speaking, a lot of tasks is not a bad thing. Scheduling overhead for each additional task is typically less than 1 millisecond, and can be a lot less.\n",
"\n",
"That said, if you have, say, a billion tasks, those milliseconds will add up to minutes. In that case you may want to simplify your task graph or use larger (and hence fewer) partitions/chunks.\n",
"\n",
"### Caching (Persistence)\n",
"\n",
"The results of computations can be cached in the cluster memory, so that they are available for reuse, or for use to derive subsequent results.\n",
"\n",
"(See: `persist` which is available on `Client`, `Bag`, `Array`, `Dataframe`, etc.)\n",
"\n",
"Use caching wisely (not indiscriminately) and monitor memory usage using the `Workers` and `Memory` dashboard panes.\n",
"\n",
"### Data Formats and Compression\n",
"\n",
"Use compression schemes which are *splittable* and allow random access, so that processing your files in parallel is more flexible, e.g., Snappy, LZ4 instead of gzip.\n",
"\n",
"For datasets, consider files (and collections of files) in Parquet, ORC, HDF5, etc.\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "RAPIDS Stable",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment