Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created May 11, 2016 22:00
Show Gist options
  • Save mrocklin/7b3d3c1b9ed3e747aaf04ad70debc8e9 to your computer and use it in GitHub Desktop.
Save mrocklin/7b3d3c1b9ed3e747aaf04ad70debc8e9 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Join Dask.DataFrame against small Pandas DataFrame\n",
"\n",
"We join a small pandas dataframe against a larger dask dataframe. This is a common-case problem.\n",
"\n",
"We do this with a fake dataset."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create fake dataset of products and categories"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"456976"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from itertools import product\n",
"\n",
"alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'\n",
"products = [''.join(x) for x in product(alphabet, alphabet, alphabet, alphabet)]\n",
"len(products)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"['AAAA', 'AAAB', 'AAAC', 'AAAD', 'AAAE', 'AAAF', 'AAAG', 'AAAH']"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"products[:8]"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"['ZZZS', 'ZZZT', 'ZZZU', 'ZZZV', 'ZZZW', 'ZZZX', 'ZZZY', 'ZZZZ']"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"products[-8:]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>category</th>\n",
" <th>product</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>101</td>\n",
" <td>AAAA</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>859</td>\n",
" <td>AAAB</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>851</td>\n",
" <td>AAAC</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>988</td>\n",
" <td>AAAD</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>78</td>\n",
" <td>AAAE</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" category product\n",
"0 101 AAAA\n",
"1 859 AAAB\n",
"2 851 AAAC\n",
"3 988 AAAD\n",
"4 78 AAAE"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"dimension = pd.DataFrame({'product': products, \n",
" 'category': np.random.randint(0, 1000, size=len(products))})\n",
"dimension.head()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def fake_data(size):\n",
" import random\n",
" import numpy as np\n",
" return pd.DataFrame({'product': [''.join(random.sample(alphabet, 4)) for i in range(size)],\n",
" 'value': np.random.exponential(100, size=size).astype(int),\n",
" 'rating': np.random.normal(0, 1, size=size)})"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product</th>\n",
" <th>rating</th>\n",
" <th>value</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>UADK</td>\n",
" <td>1.318256</td>\n",
" <td>78</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>AQLZ</td>\n",
" <td>-0.341347</td>\n",
" <td>23</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>ZMKG</td>\n",
" <td>0.908846</td>\n",
" <td>22</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>AWTD</td>\n",
" <td>-0.025588</td>\n",
" <td>19</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>PFQT</td>\n",
" <td>-0.957730</td>\n",
" <td>125</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" product rating value\n",
"0 UADK 1.318256 78\n",
"1 AQLZ -0.341347 23\n",
"2 ZMKG 0.908846 22\n",
"3 AWTD -0.025588 19\n",
"4 PFQT -0.957730 125"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"example = fake_data(5)\n",
"example"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product</th>\n",
" <th>rating</th>\n",
" <th>value</th>\n",
" <th>category</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>UADK</td>\n",
" <td>1.318256</td>\n",
" <td>78</td>\n",
" <td>203</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>AQLZ</td>\n",
" <td>-0.341347</td>\n",
" <td>23</td>\n",
" <td>895</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>ZMKG</td>\n",
" <td>0.908846</td>\n",
" <td>22</td>\n",
" <td>320</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>AWTD</td>\n",
" <td>-0.025588</td>\n",
" <td>19</td>\n",
" <td>189</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>PFQT</td>\n",
" <td>-0.957730</td>\n",
" <td>125</td>\n",
" <td>935</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" product rating value category\n",
"0 UADK 1.318256 78 203\n",
"1 AQLZ -0.341347 23 895\n",
"2 ZMKG 0.908846 22 320\n",
"3 AWTD -0.025588 19 189\n",
"4 PFQT -0.957730 125 935"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"example.merge(dimension, on='product', how='inner')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallelize with Dask.Dataframe\n",
"\n",
"We do the same thing but now in parallel with dask.dataframe.\n",
"\n",
"We start on a single machine. We'll repeat on a cluster later."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make a fake fact table\n",
"\n",
"Normally we we would load our data in from some external source as in the following:\n",
"\n",
"```python\n",
"df = dask.dataframe.read_csv('hdfs://path/to/my/data/*.csv')\n",
"```\n",
"\n",
"Instead we create a `fake_data` function to produce each \"file\". You can ignore this if you have data elsewhere."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"from dask import delayed\n",
"\n",
"partitions = []\n",
"for i in range(10):\n",
" partitions.append(delayed(fake_data)(10000))\n",
" \n",
"example = fake_data(1)\n",
"\n",
"df = dd.from_delayed(partitions, example)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dd.DataFrame<from-de..., npartitions=10>"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product</th>\n",
" <th>rating</th>\n",
" <th>value</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>ANUO</td>\n",
" <td>0.443365</td>\n",
" <td>236</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>AQDY</td>\n",
" <td>0.197511</td>\n",
" <td>61</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>BNVK</td>\n",
" <td>-1.088167</td>\n",
" <td>277</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>JOHE</td>\n",
" <td>0.869307</td>\n",
" <td>97</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>OCWS</td>\n",
" <td>0.649588</td>\n",
" <td>1</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" product rating value\n",
"0 ANUO 0.443365 236\n",
"1 AQDY 0.197511 61\n",
"2 BNVK -1.088167 277\n",
"3 JOHE 0.869307 97\n",
"4 OCWS 0.649588 1"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product</th>\n",
" <th>rating</th>\n",
" <th>value</th>\n",
" <th>category</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>XEMK</td>\n",
" <td>0.932693</td>\n",
" <td>49</td>\n",
" <td>848</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>SDPC</td>\n",
" <td>0.471996</td>\n",
" <td>14</td>\n",
" <td>957</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>ECXQ</td>\n",
" <td>0.124400</td>\n",
" <td>114</td>\n",
" <td>136</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>IOCP</td>\n",
" <td>-1.203039</td>\n",
" <td>66</td>\n",
" <td>348</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>YOEV</td>\n",
" <td>-1.279240</td>\n",
" <td>57</td>\n",
" <td>869</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" product rating value category\n",
"0 XEMK 0.932693 49 848\n",
"1 SDPC 0.471996 14 957\n",
"2 ECXQ 0.124400 114 136\n",
"3 IOCP -1.203039 66 348\n",
"4 YOEV -1.279240 57 869"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"joined = df.merge(dimension, how='inner', on='product')\n",
"joined.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Analyze the joined table\n",
"\n",
"We'll find the top rated categories"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"category\n",
"312 0.345027\n",
"270 0.296863\n",
"794 0.295279\n",
"314 0.273500\n",
"475 0.272231\n",
"78 0.250727\n",
"281 0.250145\n",
"266 0.248127\n",
"297 0.245323\n",
"740 0.243576\n",
"Name: rating, dtype: float64"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"joined.groupby('category').rating.mean().nlargest(10).compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed computing\n",
"\n",
"We connect to a cluster of workers, and repeat the experiment on a larger cluster."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Executor: scheduler=localhost:8786 workers=20 threads=320>"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Executor, progress\n",
"e = Executor('localhost:8786')\n",
"e"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"dfs = [delayed(fake_data)(10000) for i in range(1000)]\n",
"example = fake_data(1)\n",
"df = dd.from_delayed(dfs, example)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dd.DataFrame<from-de..., npartitions=1000>"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"joined = df.merge(dimension, how='inner', on='product')\n",
"result = joined.groupby('category').rating.mean().nlargest(10)\n",
"future = e.compute(result)\n",
"progress(future)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>category</th>\n",
" <th>product</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>101</td>\n",
" <td>AAAA</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>859</td>\n",
" <td>AAAB</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>851</td>\n",
" <td>AAAC</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>988</td>\n",
" <td>AAAD</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>78</td>\n",
" <td>AAAE</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" category product\n",
"0 101 AAAA\n",
"1 859 AAAB\n",
"2 851 AAAC\n",
"3 988 AAAD\n",
"4 78 AAAE"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dimension.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Future Work\n",
"\n",
"Looking at the profile plot its clear that performing the merge is taking up most of the time. This merge is against an text column which is implemented using the Python Object dtype, which is both slow and stops multi-core work. I suspect that by switching to a categorical or other numeric dtype we could run much much faster."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment