Skip to content

Instantly share code, notes, and snippets.

@avriiil
Forked from mrocklin/tutorial.ipynb
Created November 2, 2022 11:38
Show Gist options
  • Save avriiil/8631a9e7940e938d243268e9a34f7bc7 to your computer and use it in GitHub Desktop.
Save avriiil/8631a9e7940e938d243268e9a34f7bc7 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "70927395-731d-4619-837b-141306f4dc59",
"metadata": {
"tags": []
},
"source": [
"Dask for Parallel Python\n",
"========================\n",
"\n",
"Dask has many APIs, some are low level, and some are high level:\n",
"\n",
"1. Low level: let you parallelize mostly any Python code\n",
"\n",
" General purpose, you're in control\n",
" \n",
"2. High level: mimic common PyData libraries like Numpy/Pandas/Xarray/XGBoost/...\n",
"\n",
" Special purpose, lots of automation\n",
" \n",
"In this notebook we're going to use *both* to work through a tabular data problem. In particular we'll use:\n",
"\n",
"1. Dask Futures, a low-level API that can do mostly anything\n",
"2. Dask Dataframe, a high level API that makes Pandas-at-scale workflows easy\n",
"\n",
"TODO: images and links"
]
},
{
"cell_type": "markdown",
"id": "34a29cd7-6209-4376-8064-300c728d8524",
"metadata": {},
"source": [
"Data and Problem\n",
"----------------\n",
"\n",
"We're going to play with the NYC Flights data showing flights in to and out of the NYC area. This data is stored as a directory of CSV files."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ed62656b-65d6-46cc-a107-425ee20b4484",
"metadata": {},
"outputs": [],
"source": [
"# TODO: prep data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "15da2ec0-6352-47df-a4d1-dd262a97f68c",
"metadata": {},
"outputs": [],
"source": [
"# TODO: list files"
]
},
{
"cell_type": "markdown",
"id": "98a25e50-c6c2-41fa-ac78-4769f92ac1b6",
"metadata": {},
"source": [
"Let's work together to better understand the performance of the airports in the NYC area (EWR, JFK, LGA).\n",
"\n",
"We'll do this by asking increasingly complicated questions like the following:\n",
"\n",
"1. How many flights arrived or took off per year?\n",
"2. What was the latest that a flight has departed?\n",
"3. Which airport has the best record for on-time departures?\n",
"\n",
"We'll do this first sequentially, and then in parallel using the low-level Dask Futures API, and then in parallel using the high level Dask Dataframe API."
]
},
{
"cell_type": "markdown",
"id": "9cb6b38e-5146-44f0-b3d2-3fdf63ac0c97",
"metadata": {},
"source": [
"How many flights took off per year?\n",
"-----------------------------------\n",
"\n",
"### Sequential code"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bb457571-5b6d-4f57-9c76-6bfbe0b5e5c8",
"metadata": {},
"outputs": [],
"source": [
"import os, glob\n",
"\n",
"filenames = glob.glob(\n",
" os.path.join('..', 'data', 'nycflights', \"*.csv\")\n",
")\n",
"\n",
"filenames"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f25e59de-87bd-4425-a597-a08b893a01e1",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1799f9fc-cb29-41cd-a370-1108452b68cd",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"years = []\n",
"lengths = []\n",
"for filename in filenames:\n",
" year = int(os.path.split(filename)[-1].split(\".\")[0])\n",
" df = pd.read_csv(filename)\n",
" length = len(df)\n",
" \n",
" years.append(year)\n",
" lengths.append(length)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2ace1613-b9ff-4c9d-97bd-ba35649cd857",
"metadata": {},
"outputs": [],
"source": [
"for year, length in zip(years, lengths):\n",
" print(year, length)"
]
},
{
"cell_type": "markdown",
"id": "2c92c3b8-122a-42d4-8e4e-0a6e7dd5949b",
"metadata": {},
"source": [
"### Parallel Code with low-level Futures\n",
"\n",
"This is an example of an embarrassingly parallel computation. We want to run the same Python code on many pieces of data. This is a very simple and also very common case that comes up all the time.\n",
"\n",
"Let's learn how to do this with Dask futures.\n",
"\n",
"First, we're going to see a very simple example, then we'll try to parallelize the code above.\n"
]
},
{
"cell_type": "markdown",
"id": "b0e9ba65-96b1-4ce9-a1e6-1bd2bc181a4b",
"metadata": {},
"source": [
"### Set up a Dask cluster locally"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c0a7de0b-8888-419a-bf29-56ae0f4d5df4",
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"\n",
"client = Client(n_workers=4, threads_per_worker=1)"
]
},
{
"cell_type": "markdown",
"id": "c57cffc0-1f17-4306-90c5-55c50a0b76b4",
"metadata": {},
"source": [
"### Dask Futures introduction"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f6fcb6c4-6fac-4261-b015-38f1dc49acc2",
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"def slowinc(x, delay=1):\n",
" time.sleep(delay)\n",
" return x + 1"
]
},
{
"cell_type": "markdown",
"id": "c940c8c9-0ad8-4fc5-b028-0bd94ae1553d",
"metadata": {},
"source": [
"Dask futures lets us run Python functions remotely on parallel hardware. Rather than calling the function directly:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "36dd72c3-2edd-4ebe-8be5-00208937dbee",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"y = slowinc(10)\n",
"y"
]
},
{
"cell_type": "markdown",
"id": "d029deff-6e2e-427d-a1c5-0d82f1e243f0",
"metadata": {},
"source": [
"We can ask Dask to run that function, `slowinc` on the data `10` by passing each as arguments into the `client.submit` method. The first argument is the function to call and the rest of the arguments are arguments to that function."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6598a837-6ff2-44ea-9d3f-113ae10c80bd",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"y_future = client.submit(slowinc, 10)\n",
"y_future"
]
},
{
"cell_type": "markdown",
"id": "f0ad02e3-6126-42ee-94f0-fcb8942e91e0",
"metadata": {},
"source": [
"You'll notice that that happened immediately. That's because all we did was submit the `slowinc` function to run on Dask, and then return a `Future`, or a pointer to where the data will eventually be.\n",
"\n",
"We can gather the future by calling `future.result()`"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8a2781e2-ad37-4138-ab4c-8e1d42f3b789",
"metadata": {},
"outputs": [],
"source": [
"y_future"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "298a8ba1-7a3a-40d6-947d-a577ef449686",
"metadata": {},
"outputs": [],
"source": [
"y = y_future.result()\n",
"y"
]
},
{
"cell_type": "markdown",
"id": "2ad94883-2472-4f57-8017-9434d20a3379",
"metadata": {},
"source": [
"## Submit many tasks in a loop\n",
"\n",
"We can submit lots of functions to run at once, and then gather them when we're done. This allows us to easily parallelize simple for loops."
]
},
{
"cell_type": "markdown",
"id": "3b36fcfc-bfad-4388-8c47-58bcb2e7b5ff",
"metadata": {},
"source": [
"### Sequential code"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e92b4e40-37d0-475b-a143-e8fd064f22a0",
"metadata": {},
"outputs": [],
"source": [
"%%time \n",
"\n",
"data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]\n",
"results = []\n",
"\n",
"for x in data:\n",
" result = slowinc(x)\n",
" results.append(result)\n",
" \n",
"results"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "434643e7-8c18-4ff8-bfad-48c92b553c7f",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]\n",
"futures = []\n",
"\n",
"for x in data:\n",
" future = client.submit(slowinc, x)\n",
" futures.append(future)\n",
" \n",
"results = [future.result() for future in futures]\n",
"results"
]
},
{
"cell_type": "markdown",
"id": "5c6a7083-784a-42e8-8b2f-a36fa8b8b035",
"metadata": {},
"source": [
"Back to flights\n",
"---------------\n",
"\n",
"Given the pattern above, can you parallelize the sequential work below?\n",
"\n",
"Some things to think about:\n",
"\n",
"1. Which of the Python calls do you want to offload to the Dask cluster?\n",
"\n",
" (there are many right answers here)\n",
" \n",
"2. How much more quickly do you think it will run?\n",
"\n",
" (only if you like thinking about performance)"
]
},
{
"cell_type": "markdown",
"id": "2a95a0d5-4c05-480b-99c0-e7c991f2200a",
"metadata": {},
"source": [
"### Sequential code"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c256bdd9-1bea-4d77-908b-9e059db3bbd8",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"years = []\n",
"lengths = []\n",
"for filename in filenames:\n",
" year = int(os.path.split(filename)[-1].split(\".\")[0])\n",
" df = pd.read_csv(filename)\n",
" length = len(df)\n",
" \n",
" years.append(year)\n",
" lengths.append(length)"
]
},
{
"cell_type": "markdown",
"id": "7c6b908e-474a-402b-a4e3-23a14a8976eb",
"metadata": {},
"source": [
"### Exercise: Parallel code"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "60cce661-4e48-4349-8d45-61f4d6b34cd1",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"...\n",
"\n",
"for filename in filenames:\n",
" ...\n",
" "
]
},
{
"cell_type": "markdown",
"id": "9343e187-3863-4741-a91f-d4601579b650",
"metadata": {},
"source": [
"## What is the longest delay?"
]
},
{
"cell_type": "markdown",
"id": "c426c379-bb09-4cd0-a6e8-34143aa89c22",
"metadata": {},
"source": [
"### Sequential code"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "376d6bb0-f764-4860-b151-489e96250321",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"delays = []\n",
"for filename in filenames:\n",
" df = pd.read_csv(filename)\n",
" delay = df.ArrDelay.max()\n",
" delays.append(delay)\n",
" \n",
"max(delays)"
]
},
{
"cell_type": "markdown",
"id": "e6434cd8-11c9-4ff3-9a07-0026b696ab7e",
"metadata": {},
"source": [
"### Exercise: Parallel code\n",
"\n",
"This should be similar and maybe a bit simpler than the exercise above. \n",
"\n",
"If you want to play around, you can choose to run the final `max` call either locally on your laptop, or remotely by submitting it on the other Dask futures. Dask allows you to submit tasks on the results of other tasks, without ever bringing them back:\n",
"\n",
"```python\n",
"futures_1 = [client.submit(func1, arg) for arg in data]\n",
"futures_2 = [client.submit(func2, future) for future in futures_1]\n",
"future = client.submit(aggregate, futures2)\n",
"future.result()\n",
"```\n",
"\n",
"TODO: this doesn't quite flow well"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f9198bd4-9c10-4b17-929f-36fa2ad09981",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"...\n",
"\n",
"for filename in filenames:\n",
" ...\n",
" \n"
]
},
{
"cell_type": "markdown",
"id": "51be74d5-7415-4cfd-aaa5-6fd05bc6af1c",
"metadata": {},
"source": [
"## How many flights total?\n",
"\n",
"How many flights were there in the entire dataset?\n",
"\n",
"We're going to ask you to write both the sequential and parallel codes this time."
]
},
{
"cell_type": "markdown",
"id": "b0f35d1a-da99-4531-9103-90f98162d38f",
"metadata": {},
"source": [
"### Sequential code"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b756fcc7-a416-4394-ba61-139a5a0db3ea",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"...\n",
"\n",
"for filename in filenames:\n",
" ..."
]
},
{
"cell_type": "markdown",
"id": "69109334-b5d3-485b-b713-c118a82d136b",
"metadata": {},
"source": [
"### Parallel code\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5293bfbf-4967-4122-ac6e-05ca4d10e49d",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"...\n",
"\n",
"for filename in filenames:\n",
" ...\n",
" \n",
" "
]
},
{
"cell_type": "markdown",
"id": "ba019dfc-58ad-4948-ba13-998b5ce149f2",
"metadata": {},
"source": [
"## Dask DataFrame\n",
"\n",
"This is great. We could ask increasingly complex questions and you could write down increasingly complex parallel algorithms like this. \n",
"\n",
"Fortunately, someone has already done this work for Pandas and put all of these algorithms into the dask.dataframe library."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e6186cd1-4aac-4332-bcc8-3274075df754",
"metadata": {},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"\n",
"df = dd.read_csv(\n",
" os.path.join(\"..\", \"data\", \"nycflights\", \"*.csv\"),\n",
" parse_dates={'Date': [0, 1, 2]},\n",
" usecols=[\"ArrTime\", \"UniqueCarrier\", \"ActualElapsedTime\", \"ArrTime\", \"ArrDelay\", \"DepDelay\", \"Origin\", \"Dest\", \"Distance\", \"Cancelled\"],\n",
")\n",
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6d59663e-cd04-4b4d-b370-318b9c80cb7d",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df.ArrDelay.max().compute()"
]
},
{
"cell_type": "markdown",
"id": "2b82e94a-b8d0-440a-9935-b0f252f3a5a5",
"metadata": {},
"source": [
"Dask Dataframe looks a lot like Pandas. The biggest differences are that ...\n",
"\n",
"1. It runs in parallel on top of Dask\n",
"2. You have to call `.compute()` when you want a parallel result delivered to your computer as a normal result."
]
},
{
"cell_type": "markdown",
"id": "a8aa93f1-9037-4e4f-96a5-44d7162559f9",
"metadata": {},
"source": [
"## Exercise: Average arrival delay by airport\n",
"\n",
"What is the average arrival delay for flights departing from the three major airports, EWR (Newark), JFK, and LGA (LaGuardia)\n",
"\n",
"You'll want to look at the `Origin` and `ArrDelay` columns"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cc7f011f-00c8-4192-9875-aef446d00b33",
"metadata": {},
"outputs": [],
"source": [
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "57453040-1ef0-4978-8440-f9d8b3018661",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "b03408e0-3020-4897-844a-67c8fa5ced9a",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "e73b372a-0e58-4667-a02a-077ed0f18a6c",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"id": "26e881db-3f39-4d82-840d-41fe205f0dd0",
"metadata": {},
"source": [
"## Challenge Exercise 1: Compute Quantiles\n",
"\n",
"Rather than just the average, see what 10%, 50%, and 90% quantiles are like for each airport."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b88e7f6d-44bf-4156-b673-af9eafb39bf1",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "843d18e8-29c9-4b20-bebf-ab773d669ac5",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "48a64851-a62b-42df-8920-d6f4ee673cf8",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"id": "fc6407f6-d911-408d-837e-342bd998b8a2",
"metadata": {},
"source": [
"## Challenge Exercise 2: Compute Average with Dask Futures\n",
"\n",
"Do the same exercise as the normal exercise above but manually with the low-level Dask futures API."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a93ff0b1-709e-40f8-98c7-0731fc6080bb",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "c3423939-dc3f-4f70-8fd8-6e7c26909019",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "cbe21d9f-662f-4ca7-8811-d277693c9f7e",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"id": "88235838-e61a-484a-a44a-cc89415e3ccb",
"metadata": {},
"source": [
"## Managing Memory\n",
"\n",
"When we run operations like the following many times we're being inefficient:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5d9a63f9-222a-4be2-a567-6192bb9f1a93",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"print(df[df.Origin == \"EWR\"].ArrDelay.mean().compute())\n",
"\n",
"print(df[df.Origin == \"LGA\"].ArrDelay.mean().compute())\n",
"\n",
"print(df[df.Origin == \"JFK\"].ArrDelay.mean().compute())"
]
},
{
"cell_type": "markdown",
"id": "64146cda-0cab-4d42-ba83-075321a0588c",
"metadata": {},
"source": [
"We spend most of our time reading the CSV files each time. \n",
"\n",
"There are two ways to address this."
]
},
{
"cell_type": "markdown",
"id": "c77292dd-16d5-4bc7-ba4a-979b47648df6",
"metadata": {},
"source": [
"### 1. Ask for everything at once"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c0a1f348-1c39-43aa-be14-ab3949abcf85",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"ewr = df[df.Origin == \"EWR\"].ArrDelay.mean()\n",
"lga = df[df.Origin == \"LGA\"].ArrDelay.mean()\n",
"jfk = df[df.Origin == \"JFK\"].ArrDelay.mean()\n",
"\n",
"import dask\n",
"\n",
"dask.compute(ewr, lga, jfk)"
]
},
{
"cell_type": "markdown",
"id": "2d7cf511-cddb-4b69-a220-d284a7025546",
"metadata": {},
"source": [
"## 2. Persist data in memory"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "db08b125-05d9-42fd-bed4-5e756cc1820a",
"metadata": {},
"outputs": [],
"source": [
"df = df.persist()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "970cc5cc-ad38-4854-8e4a-b3250d7d6f55",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"print(df[df.Origin == \"EWR\"].ArrDelay.mean().compute())\n",
"print(df[df.Origin == \"LGA\"].ArrDelay.mean().compute())\n",
"print(df[df.Origin == \"JFK\"].ArrDelay.mean().compute())"
]
},
{
"cell_type": "markdown",
"id": "8cc51ca2-e460-402e-afa4-46033148cb6d",
"metadata": {},
"source": [
"# Next Steps\n",
"\n",
"In the next notebooks we'll expand on the lessons learned here in two ways:\n",
"\n",
"1. Use Dask Futures for more advanced applications beyond dataframes\n",
"2. Scale up to distributed clusters"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:play]",
"language": "python",
"name": "conda-env-play-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment