Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Last active April 19, 2016 17:26
Show Gist options
  • Save mrocklin/80b0d6f57dedc1628954ced5ef5500b0 to your computer and use it in GitHub Desktop.
Save mrocklin/80b0d6f57dedc1628954ced5ef5500b0 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## SKLearn Random Forests + Dask.Distributed Futures\n",
"\n",
"We naively parallelize a random forest fit using dask.distributed Futures."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### First single machine"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from sklearn.datasets import make_classification\n",
"from sklearn.ensemble import RandomForestClassifier\n",
"from sklearn.cross_validation import train_test_split"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"X, y = make_classification(n_samples=100000)\n",
"X_train, X_test, y_train, y_test = train_test_split(X, y)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"16.0"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"X.nbytes / 1e6"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.94992\n",
"CPU times: user 470 ms, sys: 11.8 ms, total: 482 ms\n",
"Wall time: 289 ms\n"
]
}
],
"source": [
"%%time\n",
"est = RandomForestClassifier(n_estimators=1, max_depth=10)\n",
"est.fit(X_train, y_train)\n",
"print(est.score(X_test, y_test))"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.96316\n",
"CPU times: user 17.2 s, sys: 23.6 ms, total: 17.2 s\n",
"Wall time: 17.2 s\n"
]
}
],
"source": [
"%%time\n",
"est = RandomForestClassifier(n_estimators=64, max_depth=10)\n",
"est.fit(X_train, y_train)\n",
"print(est.score(X_test, y_test))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Analysis\n",
"\n",
"OK, so extra work here only provides only marginal improvement. Still, it's a valid computational exercise if we don't mind ignoring actual utility. Hopefully this is a standin for something more useful.\n",
"\n",
"I would love to find a more meaningful computation. Suggestions welcome."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed Cluster\n",
"\n",
"So lets do roughly the same thing, but now on a cluster of 8 `m4.xlarge`s, (4 cores, 15GB RAM each)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Executor: scheduler=54.173.229.79:8786 workers=32 threads=32>"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from distributed import Executor, progress, wait\n",
"e = Executor('54.173.229.79:8786')\n",
"e"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Move data to all nodes\n",
"\n",
"This is small data, we'll broadcast it out to all workers"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 24.5 ms, sys: 7.98 ms, total: 32.5 ms\n",
"Wall time: 449 ms\n"
]
}
],
"source": [
"%%time\n",
"XX, yy = e.scatter([X_train, y_train], broadcast=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Fit single-tree RandomForestClassifier 64 times"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 29.4 ms, sys: 65 µs, total: 29.5 ms\n",
"Wall time: 955 ms\n"
]
}
],
"source": [
"%%time\n",
"def fit(x, y):\n",
" est = RandomForestClassifier(n_estimators=1, max_depth=10)\n",
" est.fit(x, y)\n",
" return est\n",
"\n",
"estimators = [e.submit(fit, XX, yy, pure=False) for i in range(64)]\n",
"# progress(estimators)\n",
"wait(estimators)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Gather data back to local machine"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',\n",
" max_depth=10, max_features='auto', max_leaf_nodes=None,\n",
" min_samples_leaf=1, min_samples_split=2,\n",
" min_weight_fraction_leaf=0.0, n_estimators=1, n_jobs=1,\n",
" oob_score=False, random_state=None, verbose=0,\n",
" warm_start=False)\n",
"CPU times: user 12.7 ms, sys: 4.07 ms, total: 16.8 ms\n",
"Wall time: 71.1 ms\n"
]
}
],
"source": [
"%%time\n",
"results = e.gather(estimators)\n",
"print(results[0])"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from copy import copy\n",
"def combine(estimators):\n",
" \"\"\" Stolen from @ogrisel's stackoverflow answer\n",
" \n",
" http://stackoverflow.com/a/25925913/616616\"\"\"\n",
" result = copy(estimators[0])\n",
" result.estimators_ = sum([est.estimators_ for est in estimators], [])\n",
" return result"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0.96387999999999996"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"est = copy(results[0])\n",
"est.estimators_ = sum([est.estimators_ for est in results], [])\n",
"est.score(X_test, y_test)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Analysis\n",
"\n",
"Looking at a the distributed web UI we find that inter-node communication is trivial (the initial scatter is quite cheap) relative to computation times. Computations do seem slightly more expensive on the worker nodes, I'm not sure why, but generally we're within a small fraction of expected.\n",
"\n",
"As before, no real predictive power has been achieved. Oh well."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Real data: NYCTaxi living on S3\n",
"\n",
"This is something like 60GB in RAM.\n",
"\n",
"We'll try to predict `passenger_count` given the other numeric columns."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Setting global dask scheduler to use distributed\n",
"CPU times: user 594 ms, sys: 54.7 ms, total: 649 ms\n",
"Wall time: 1min 13s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"from distributed import s3\n",
"\n",
"dfs = s3.read_csv('dask-data/nyc-taxi/2015', \n",
" parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],\n",
" collection=False, lazy=False)\n",
"# progress(dfs)\n",
"wait(dfs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Fit/predict single dataframe on single machine "
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>VendorID</th>\n",
" <th>tpep_pickup_datetime</th>\n",
" <th>tpep_dropoff_datetime</th>\n",
" <th>passenger_count</th>\n",
" <th>trip_distance</th>\n",
" <th>pickup_longitude</th>\n",
" <th>pickup_latitude</th>\n",
" <th>RateCodeID</th>\n",
" <th>store_and_fwd_flag</th>\n",
" <th>dropoff_longitude</th>\n",
" <th>dropoff_latitude</th>\n",
" <th>payment_type</th>\n",
" <th>fare_amount</th>\n",
" <th>extra</th>\n",
" <th>mta_tax</th>\n",
" <th>tip_amount</th>\n",
" <th>tolls_amount</th>\n",
" <th>improvement_surcharge</th>\n",
" <th>total_amount</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2</td>\n",
" <td>2015-01-15 19:05:39</td>\n",
" <td>2015-01-15 19:23:42</td>\n",
" <td>1</td>\n",
" <td>1.59</td>\n",
" <td>-73.993896</td>\n",
" <td>40.750111</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.974785</td>\n",
" <td>40.750618</td>\n",
" <td>1</td>\n",
" <td>12.0</td>\n",
" <td>1.0</td>\n",
" <td>0.5</td>\n",
" <td>3.25</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>17.05</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1</td>\n",
" <td>2015-01-10 20:33:38</td>\n",
" <td>2015-01-10 20:53:28</td>\n",
" <td>1</td>\n",
" <td>3.30</td>\n",
" <td>-74.001648</td>\n",
" <td>40.724243</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.994415</td>\n",
" <td>40.759109</td>\n",
" <td>1</td>\n",
" <td>14.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>2.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>17.80</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1</td>\n",
" <td>2015-01-10 20:33:38</td>\n",
" <td>2015-01-10 20:43:41</td>\n",
" <td>1</td>\n",
" <td>1.80</td>\n",
" <td>-73.963341</td>\n",
" <td>40.802788</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-73.951820</td>\n",
" <td>40.824413</td>\n",
" <td>2</td>\n",
" <td>9.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>10.80</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1</td>\n",
" <td>2015-01-10 20:33:39</td>\n",
" <td>2015-01-10 20:35:31</td>\n",
" <td>1</td>\n",
" <td>0.50</td>\n",
" <td>-74.009087</td>\n",
" <td>40.713818</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-74.004326</td>\n",
" <td>40.719986</td>\n",
" <td>2</td>\n",
" <td>3.5</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>4.80</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>1</td>\n",
" <td>2015-01-10 20:33:39</td>\n",
" <td>2015-01-10 20:52:58</td>\n",
" <td>1</td>\n",
" <td>3.00</td>\n",
" <td>-73.971176</td>\n",
" <td>40.762428</td>\n",
" <td>1</td>\n",
" <td>N</td>\n",
" <td>-74.004181</td>\n",
" <td>40.742653</td>\n",
" <td>2</td>\n",
" <td>15.0</td>\n",
" <td>0.5</td>\n",
" <td>0.5</td>\n",
" <td>0.00</td>\n",
" <td>0.0</td>\n",
" <td>0.3</td>\n",
" <td>16.30</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1 \n",
"1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1 \n",
"2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1 \n",
"3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1 \n",
"4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1 \n",
"\n",
" trip_distance pickup_longitude pickup_latitude RateCodeID \\\n",
"0 1.59 -73.993896 40.750111 1 \n",
"1 3.30 -74.001648 40.724243 1 \n",
"2 1.80 -73.963341 40.802788 1 \n",
"3 0.50 -74.009087 40.713818 1 \n",
"4 3.00 -73.971176 40.762428 1 \n",
"\n",
" store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type \\\n",
"0 N -73.974785 40.750618 1 \n",
"1 N -73.994415 40.759109 1 \n",
"2 N -73.951820 40.824413 2 \n",
"3 N -74.004326 40.719986 2 \n",
"4 N -74.004181 40.742653 2 \n",
"\n",
" fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 12.0 1.0 0.5 3.25 0.0 \n",
"1 14.5 0.5 0.5 2.00 0.0 \n",
"2 9.5 0.5 0.5 0.00 0.0 \n",
"3 3.5 0.5 0.5 0.00 0.0 \n",
"4 15.0 0.5 0.5 0.00 0.0 \n",
"\n",
" improvement_surcharge total_amount \n",
"0 0.3 17.05 \n",
"1 0.3 17.80 \n",
"2 0.3 10.80 \n",
"3 0.3 4.80 \n",
"4 0.3 16.30 "
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = dfs[0].result()\n",
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',\n",
" 'passenger_count', 'trip_distance', 'pickup_longitude',\n",
" 'pickup_latitude', 'RateCodeID', 'store_and_fwd_flag',\n",
" 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount',\n",
" 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',\n",
" 'improvement_surcharge', 'total_amount'],\n",
" dtype='object')"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.columns"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df_train, df_test = train_test_split(df)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.02 s, sys: 40.5 ms, total: 1.06 s\n",
"Wall time: 1.06 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"columns = ['trip_distance', \n",
" 'pickup_longitude', 'pickup_latitude', \n",
" 'dropoff_longitude', 'dropoff_latitude',\n",
" 'payment_type', 'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount']\n",
"\n",
"est = RandomForestClassifier(n_estimators=1, max_depth=10)\n",
"est.fit(df_train[columns], df_train.passenger_count)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0.7056772815894532"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"est.score(df_test[columns], df_test.passenger_count)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Fit full dataset on cluster"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 511 ms, sys: 34.5 ms, total: 545 ms\n",
"Wall time: 40.1 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"def fit(df):\n",
" est = RandomForestClassifier(n_estimators=1)\n",
" est.fit(df[columns], df.passenger_count)\n",
" return est\n",
"\n",
"def score(est, x, y):\n",
" return est.score(x, y)\n",
"\n",
"train_tests = e.map(train_test_split, dfs)\n",
"trains = e.map(lambda x: x[0], train_tests)\n",
"tests = e.map(lambda x: x[1], train_tests)\n",
"estimators = e.map(fit, trains)\n",
"# progress(estimators)\n",
"wait(estimators)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Unfortunately, can't fit all estimators onto one machine to combine\n",
"\n",
"So what should we actually be doing? Is there a way to select or merge good trees on the cluster? Should we actually be doing something completely different?"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pickle\n",
"def nbytes(x):\n",
" return len(pickle.dumps(x))"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"7047.665975"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sum(e.gather(e.map(nbytes, estimators))) / 1e6"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment