Skip to content

Instantly share code, notes, and snippets.

@john-sandall
Last active December 10, 2016 22:35
Show Gist options
  • Save john-sandall/06706d8ccd2bfe7ee8674de46dcdb415 to your computer and use it in GitHub Desktop.
Save john-sandall/06706d8ccd2bfe7ee8674de46dcdb415 to your computer and use it in GitHub Desktop.
Multiprocessing + pandas examples
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import multiprocessing\n",
"import numpy as np\n",
"import pandas as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example #1: Double a list of three numbers, add them up"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[2, 4, 6]"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def double(x):\n",
" return x*2\n",
"\n",
"pool = multiprocessing.Pool()\n",
"inputs = [1, 2, 3]\n",
"result = pool.map(double, inputs)\n",
"pool.close()\n",
"pool.join()\n",
"result"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example #2: multiprocessing for loop"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"All equal: True\n"
]
}
],
"source": [
"list_of_strings = [ # from https://github.com/minimaxir/big-list-of-naughty-strings/blob/master/blns.txt\n",
" \"Scunthorpe General Hospital\",\n",
" \"Penistone Community Church\",\n",
" \"Lightwater Country Park\",\n",
" \"Jimmy Clitheroe\",\n",
" \"Horniman Museum\",\n",
" \"shitake mushrooms\",\n",
" \"RomansInSussex.co.uk\",\n",
" \"http://www.cum.qc.ca/\",\n",
" \"Craig Cockburn, Software Specialist\",\n",
" \"Linda Callahan\",\n",
" \"Dr. Herman I. Libshitz\",\n",
" \"magna cum laude\",\n",
" \"Super Bowl XXX\",\n",
" \"medieval erection of parapets\",\n",
" \"evaluate\",\n",
" \"mocha\",\n",
" \"expression\",\n",
" \"Arsenal canal\",\n",
" \"classic\",\n",
" \"Tyson Gay\",\n",
" \"basement\"\n",
"]\n",
"\n",
"\n",
"def contains_phrase(input_string, phrase):\n",
" return phrase in input_string.lower()\n",
"\n",
"\n",
"# as for loop\n",
"for_loop_out = []\n",
"phrase = \"mushroom\"\n",
"for i in range(len(list_of_strings)):\n",
" row = list_of_strings[i]\n",
" for_loop_out.append(contains_phrase(row, phrase))\n",
" \n",
"\n",
"# using multiprocessing\n",
"from functools import partial\n",
"contains_phrase_mushroom = partial(contains_phrase, phrase=\"mushroom\")\n",
"\n",
"pool = multiprocessing.Pool(multiprocessing.cpu_count())\n",
"multiprocessing_out = pool.map(contains_phrase_mushroom, list_of_strings)\n",
"pool.close()\n",
"pool.join()\n",
"\n",
"print \"All equal:\", for_loop_out == multiprocessing_out"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example #3: multiprocessing pandas apply"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"random_data = {'a': np.random.normal(0, 1, 100), 'b': np.random.normal(0, 1, 100)}\n",
"df = pd.DataFrame(data=random_data, columns=['a', 'b'])\n",
"df['added_manually'] = df.a + df.b\n",
"df['added_with_apply'] = df.apply(lambda x: x['a'] + x['b'], axis=1)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def apply_add_rows(input_df):\n",
" return input_df.apply(lambda x: x['a'] + x['b'], axis=1)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"n_chunks = 10\n",
"grouped_df = df.groupby(df.index // n_chunks)\n",
"chunks = [group for name, group in list(grouped_df)]\n",
"pool = multiprocessing.Pool(multiprocessing.cpu_count())\n",
"return_list = pool.map(apply_add_rows, chunks)\n",
"pool.close()\n",
"pool.join()\n",
"df['added_with_multiprocess_apply'] = pd.concat(return_list)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def apply_parallel(input_df, some_function, n_chunks):\n",
" \"\"\"Generic for above set of commands\"\"\"\n",
" grouped_df = input_df.groupby(input_df.index // n_chunks)\n",
" chunks = [group for name, group in list(grouped_df)]\n",
" pool = multiprocessing.Pool(multiprocessing.cpu_count())\n",
" return_list = pool.map(some_function, chunks)\n",
" pool.close()\n",
" pool.join()\n",
" return pd.concat(return_list)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df['added_with_functionalised_mp_apply'] = apply_parallel(df, apply_add_rows, n_chunks=10)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"All good!\n"
]
}
],
"source": [
"# Checks (should all be zero)\n",
"assert len(df[df.added_manually != df.added_with_apply]) == 0\n",
"assert len(df[df.added_manually != df.added_with_multiprocess_apply]) == 0\n",
"assert len(df[df.added_manually != df.added_with_functionalised_mp_apply]) == 0\n",
"print \"All good!\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## May want to look at Dask in future\n",
"http://dask.pydata.org/en/latest/index.html"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [conda root]",
"language": "python",
"name": "conda-root-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment