Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created April 6, 2017 13:21
Show Gist options
  • Save mrocklin/cdd04293859764c542ccf547f81ab7a5 to your computer and use it in GitHub Desktop.
Save mrocklin/cdd04293859764c542ccf547f81ab7a5 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Client: scheduler='tcp://localhost:8786' processes=7 cores=56>"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask import persist\n",
"from dask.distributed import Client, progress\n",
"client = Client('localhost:8786')\n",
"client.restart()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import dask\n",
"import dask.array as da\n",
"import numpy as np\n",
"from scipy.optimize import fmin_l_bfgs_b\n",
"from sklearn import datasets\n",
"from sklearn.linear_model import LogisticRegression\n",
"\n",
"# make dataset\n",
"X, y = datasets.make_classification(n_classes=2, n_samples=10000000)\n",
"\n",
"# create dask implementation\n",
"Xda = da.from_array(X, 1000000)\n",
"yda = da.from_array(y, 1000000)\n",
"\n",
"Xda, yda = dask.persist(Xda, yda)\n",
"\n",
"# logistic\n",
"def sigmoid(x):\n",
" '''Sigmoid function of x.'''\n",
" return 1 / (1 + da.exp(-x))\n",
"\n",
"def compute_logistic_loss_grad(beta, X, y):\n",
" Xbeta = X.dot(beta)\n",
" # loss\n",
" eXbeta = da.exp(Xbeta)\n",
" loss_fn = (da.log1p(eXbeta)).sum() - da.dot(y, Xbeta)\n",
" # gradient\n",
" p = sigmoid(Xbeta)\n",
" gradient_fn = da.dot(X.T, p - y)\n",
" loss, gradient = dask.compute(loss_fn, gradient_fn)\n",
" return loss, gradient.copy()\n",
"\n",
"n, p = X.shape\n",
"beta = np.zeros(p)\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from distributed import wait"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# We balance the data around the cluster, then we duplicate it 10 times\n",
"# Then we rebalance it again. This is around 16GB\n",
"\n",
"wait([Xda, yda])\n",
"client.rebalance([Xda, yda])\n",
"\n",
"Xda = da.concatenate([Xda] * 10, axis=0).persist()\n",
"yda = da.concatenate([yda] * 10, axis=0).persist()\n",
"\n",
"wait([Xda, yda])\n",
"client.rebalance([Xda, yda])"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dask.array<concatenate, shape=(100000000, 20), dtype=float64, chunksize=(1000000, 20)>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Xda"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"16000000000"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Xda.nbytes"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/opt/anaconda/lib/python3.5/site-packages/dask/array/core.py:476: RuntimeWarning: overflow encountered in true_divide\n",
" o = func(*args, **kwargs)\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.46 s, sys: 552 µs, total: 1.46 s\n",
"Wall time: 9.98 s\n"
]
}
],
"source": [
"%%time\n",
"with dask.set_options(fuse_ave_width=0): # optimizations slows this down\n",
" new_beta_dask, loss_dask, info_dask = fmin_l_bfgs_b(\n",
" compute_logistic_loss_grad, beta, fprime=None,\n",
" args=(Xda, yda),\n",
" iprint=0, pgtol=1e-14, maxiter=10)\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 25.6 s, sys: 2.24 s, total: 27.8 s\n",
"Wall time: 14.1 s\n"
]
}
],
"source": [
"%%time\n",
"# sklearn LR with effectively no regularization and using scipy's L-BFGS optimizer\n",
"# This is on the smaller, 1.6GB dataset\n",
"lr = LogisticRegression(fit_intercept=False, solver=\"lbfgs\", C=1e5, max_iter=10)\n",
"lr.fit(X, y)\n"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[[ -7.18210521e-05 1.48211054e+00 8.59712361e-04 -5.36094181e-01\n",
" -4.73116035e-04 5.15202678e-04 7.14100165e-04 -7.67761494e-04\n",
" -2.20663555e-03 6.40228731e-01 2.15719078e-03 1.11574985e-03\n",
" 3.01840479e-03 -8.91871490e-05 -1.20989408e-04 -7.35972279e-04\n",
" -2.26868882e-04 -2.03733770e-03 -1.12063873e+00 -4.72233441e-04]]\n",
"[ -7.18210522e-05 1.48211054e+00 8.59712361e-04 -5.36094181e-01\n",
" -4.73116035e-04 5.15202679e-04 7.14100165e-04 -7.67761494e-04\n",
" -2.20663555e-03 6.40228731e-01 2.15719078e-03 1.11574985e-03\n",
" 3.01840479e-03 -8.91871490e-05 -1.20989408e-04 -7.35972279e-04\n",
" -2.26868882e-04 -2.03733770e-03 -1.12063873e+00 -4.72233441e-04]\n"
]
}
],
"source": [
"print(lr.coef_)\n",
"print(new_beta_dask)\n"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [conda root]",
"language": "python",
"name": "conda-root-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment