Skip to content

Instantly share code, notes, and snippets.

@driscoll
Created November 11, 2014 21:30
Show Gist options
  • Save driscoll/b8de4bf980de1ad890de to your computer and use it in GitHub Desktop.
Save driscoll/b8de4bf980de1ad890de to your computer and use it in GitHub Desktop.
Parallel imap for IPython Notebook
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "",
"signature": "sha256:1594a936e163d29d2841661010edb925c1e1a47dc04c1ef79fb6ae5f17ce8ecd"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Parallel imap experiment"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The purpose of this script is to create a generic wrapper that approximates the behavior of multiprocessing.imap_unordered using the standard methods included with IPython.parallel.\n",
"\n",
"One nice feature of imap_unordered is that it does not load all of the input data into memory at once. As a result, it's possible to grind over a large dataset by passing along a generator.\n",
"\n",
"Unfortunately, multiprocessing doesn't play well with IPython Notebook and none of the methods in IPython.parallel take a generator without first loading all of the data into memory.\n",
"\n",
"This problem was previously raised by Vincent on Stack Overflow last year and minrk offered a really nice example.\n",
"* [Vincent's original question](http://stackoverflow.com/questions/18135060/how-to-use-ipython-parallel-map-with-generators-as-input-to-function)\n",
"* [minrk's example solution](http://nbviewer.ipython.org/gist/minrk/6203173)\n",
"\n",
"I think we can take it one step further and make an abstract wrapper that can be used anywhere you might have otherwise used multiprocessing.imap_unordered. This example is about half-way there."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setting up the environment"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import IPython\n",
"client = IPython.parallel.Client()\n",
"print client.ids"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[0, 1, 2, 3]\n"
]
}
],
"prompt_number": 174
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"lbview = client.load_balanced_view()"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 175
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example functions and data to play with"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def iter_lines(datafile):\n",
" for line in datafile:\n",
" yield line.strip()"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 176
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def tokenize(s):\n",
" return sorted(set(s.lower().split()))"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 177
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"!wget http://www.gutenberg.org/cache/epub/25439/pg25439.txt\n",
"!mv pg25439.txt bellamy-looking_backward-1887.txt\n",
"!wc -l bellamy-looking_backward-1887.txt\n",
"fn = \"bellamy-looking_backward-1887.txt\""
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"--2014-11-11 16:21:57-- http://www.gutenberg.org/cache/epub/25439/pg25439.txt\r\n",
"Resolving www.gutenberg.org (www.gutenberg.org)... "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"152.19.134.47\r\n",
"Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:80... connected.\r\n",
"HTTP request sent, awaiting response... "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"200 OK\r\n",
"Length: 496005 (484K) [text/plain]\r\n",
"Saving to: `pg25439.txt'\r\n",
"\r\n",
"\r",
" 0% [ ] 0 --.-K/s "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\r",
"100%[======================================>] 496,005 --.-K/s in 0.1s \r\n",
"\r\n",
"2014-11-11 16:21:58 (3.34 MB/s) - `pg25439.txt' saved [496005/496005]\r\n",
"\r\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"8841 bellamy-looking_backward-1887.txt\r\n"
]
}
],
"prompt_number": 178
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Single core map"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%time\n",
"with open(fn, \"rb\") as f:\n",
" tokens = map(tokenize, iter_lines(f))"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 24 ms, sys: 24 ms, total: 48 ms\n",
"Wall time: 53.2 ms\n"
]
}
],
"prompt_number": 179
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallel map using load balancer"
]
},
{
"cell_type": "code",
"collapsed": true,
"input": [
"%%time\n",
"with open(fn, \"rb\") as f:\n",
" results = lbview.map(tokenize, iter_lines(f))\n",
"parallel_tokens = results.get()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 8.7 s, sys: 4.19 s, total: 12.9 s\n",
"Wall time: 30.6 s\n"
]
}
],
"prompt_number": 180
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"tokens == parallel_tokens"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 181,
"text": [
"True"
]
}
],
"prompt_number": 181
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallel imap taking input from a generator"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def imap(function, generator, view, \n",
" preprocessor=iter, chunksize=256):\n",
" num_cores = len(view.client.ids)\n",
" queue = []\n",
" for i, n in enumerate(preprocessor(generator)):\n",
" queue.append(n)\n",
" if not i % (chunksize * num_cores):\n",
" for result in view.map(function, queue):\n",
" yield result\n",
" queue = []\n",
" for result in view.map(function, queue):\n",
" yield result"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 182
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%time\n",
"with open(fn, \"rb\") as f:\n",
" imap_tokens = []\n",
" for result in imap(tokenize, f, lbview, iter_lines, 128):\n",
" imap_tokens.append(result)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 10.4 s, sys: 3.15 s, total: 13.6 s\n",
"Wall time: 21.4 s\n"
]
}
],
"prompt_number": 183
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"parallel_tokens == imap_tokens"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 184,
"text": [
"True"
]
}
],
"prompt_number": 184
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Vincent's original problem"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from itertools import product"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 185
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def stringcount((longstring, substrings)):\n",
" scount = [longstring.count(s) for s in substrings]\n",
" return (longstring, substrings, scount)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 186
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def gen_pairs(long_string, sub_strings):\n",
" for l in long_string:\n",
" s = sub_strings.next()\n",
" yield (l, s)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 187
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Single core map"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"longstring = product('abc', repeat=3)\n",
"substrings = product('abc', repeat=2)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 197
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%time\n",
"for x in map(stringcount, gen_pairs(longstring, substrings)):\n",
" print x"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"(('a', 'a', 'a'), ('a', 'a'), [3, 3])\n",
"(('a', 'a', 'b'), ('a', 'b'), [2, 1])\n",
"(('a', 'a', 'c'), ('a', 'c'), [2, 1])\n",
"(('a', 'b', 'a'), ('b', 'a'), [1, 2])\n",
"(('a', 'b', 'b'), ('b', 'b'), [2, 2])\n",
"(('a', 'b', 'c'), ('b', 'c'), [1, 1])\n",
"(('a', 'c', 'a'), ('c', 'a'), [1, 2])\n",
"(('a', 'c', 'b'), ('c', 'b'), [1, 1])\n",
"(('a', 'c', 'c'), ('c', 'c'), [2, 2])\n",
"CPU times: user 0 ns, sys: 0 ns, total: 0 ns\n",
"Wall time: 549 \u00b5s\n"
]
}
],
"prompt_number": 198
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallel imap"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"longstring = product('abc', repeat=3)\n",
"substrings = product('abc', repeat=2)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 199
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%time\n",
"for x in imap(stringcount, gen_pairs(longstring, substrings), lbview):\n",
" print x"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"(('a', 'a', 'a'), ('a', 'a'), [3, 3])\n",
"(('a', 'a', 'b'), ('a', 'b'), [2, 1])\n",
"(('a', 'a', 'c'), ('a', 'c'), [2, 1])\n",
"(('a', 'b', 'a'), ('b', 'a'), [1, 2])\n",
"(('a', 'b', 'b'), ('b', 'b'), [2, 2])\n",
"(('a', 'b', 'c'), ('b', 'c'), [1, 1])\n",
"(('a', 'c', 'a'), ('c', 'a'), [1, 2])\n",
"(('a', 'c', 'b'), ('c', 'b'), [1, 1])\n",
"(('a', 'c', 'c'), ('c', 'c'), [2, 2])\n",
"CPU times: user 16 ms, sys: 4 ms, total: 20 ms\n",
"Wall time: 32.8 ms\n"
]
}
],
"prompt_number": 200
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment