Skip to content

Instantly share code, notes, and snippets.

@robinovitch61
Last active October 30, 2019 17:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robinovitch61/380dc5af7c1bf684d5dbd64cac966f7c to your computer and use it in GitHub Desktop.
Save robinovitch61/380dc5af7c1bf684d5dbd64cac966f7c to your computer and use it in GitHub Desktop.
multiprocessing.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Multiprocessing\n",
"***\n",
"***"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Threading vs. Multiprocessing:\n",
"* Before, we saw that `threading` actually degraded performance for CPU-bound operations as they don't actually run simultaneously and their creation and coordination has some overhead\n",
"* `multiprocessing` allows actual parallel operation and has a very similar API to threading \n",
" \n",
"*Note: don't use multiprocessing for IO operations as threading can actually operate in parallel there and has less overhead*"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Same as before:\n",
"def sum_range(end, start=0, add_to_queue=True):\n",
" print(\"Summing from {} to {}\".format(str(start), str(end)))\n",
" assert start < end\n",
"\n",
" total = 0\n",
" count = start\n",
" while count < end:\n",
" total += count\n",
" count += 1\n",
" return total"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Change N_THREADS to N_THINGS\n",
"SUM_TO = int(5e7)\n",
"N_THINGS = 10"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Sequential case:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Summing from 0 to 50000000\n",
"CPU times: user 6.05 s, sys: 0 ns, total: 6.05 s\n",
"Wall time: 6.02 s\n"
]
},
{
"data": {
"text/plain": [
"1249999975000000"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"sum_range(SUM_TO)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Threaded as before:\n",
"* Note that here we're using queues, a thread-safe object, to store results from each thread."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Summing from 0 to 5000000\n",
"Summing from 5000000 to 10000000\n",
"Summing from 10000000 to 15000000\n",
"Summing from 15000000 to 20000000\n",
"Summing from 20000000 to 25000000\n",
"Summing from 25000000 to 30000000\n",
"Summing from 30000000 to 35000000\n",
"Summing from 35000000 to 40000000Summing from 40000000 to 45000000\n",
"\n",
"Summing from 45000000 to 50000000\n",
"CPU times: user 6.01 s, sys: 120 ms, total: 6.13 s\n",
"Wall time: 5.93 s\n"
]
},
{
"data": {
"text/plain": [
"1249999975000000"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"import threading\n",
"\n",
"# Queues are thread-safe objects, i.e. it implements the\n",
"# required locking semantics such that communication can\n",
"# occur safely between threads. It operates under the\n",
"# FIFO (first in, first out) principle\n",
"from queue import Queue\n",
"q = Queue()\n",
"\n",
"def sum_threaded():\n",
" \n",
" step = int(SUM_TO / N_THINGS)\n",
"\n",
" def sum_to_queue(end, start):\n",
" '''Helper function\n",
" to put result in queue'''\n",
" q.put(sum_range(end, start))\n",
" \n",
" threads = []\n",
" for ii in range(N_THINGS):\n",
" start = ii * step\n",
" end = (ii + 1) * step\n",
" t = threading.Thread(\n",
" target=sum_to_queue,\n",
" args=(end, start)\n",
" )\n",
" t.start()\n",
" threads.append(t)\n",
" \n",
" # ensure function only returns once all threads completed\n",
" for t in threads:\n",
" t.join()\n",
"\n",
"sum_threaded()\n",
"\n",
"# there will be exactly N_THINGS items in queue\n",
"sum([q.get() for _ in range(N_THINGS)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Multiprocessing:"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here is what was replaced above.\n",
"* `import threading` became `import multiprocessing`\n",
"* `from queue import Queue` became `from multiprocessing import Queue`\n",
"* `threading.Thread` became `multiprocessing.Process`\n",
"* variable/function names\n",
" \n",
"**Note that it is ~3x faster than threading!**"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Summing from 0 to 5000000\n",
"Summing from 5000000 to 10000000\n",
"Summing from 15000000 to 20000000\n",
"Summing from 10000000 to 15000000\n",
"Summing from 25000000 to 30000000\n",
"Summing from 20000000 to 25000000\n",
"Summing from 30000000 to 35000000\n",
"Summing from 35000000 to 40000000\n",
"Summing from 40000000 to 45000000\n",
"Summing from 45000000 to 50000000\n",
"CPU times: user 40 ms, sys: 60 ms, total: 100 ms\n",
"Wall time: 1.87 s\n"
]
},
{
"data": {
"text/plain": [
"1249999975000000"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"import multiprocessing\n",
"\n",
"# Queues are thread-safe objects, i.e. it implements the\n",
"# required locking semantics such that communication can\n",
"# occur safely between threads. It operates under the\n",
"# FIFO (first in, first out) principle\n",
"from multiprocessing import Queue\n",
"q = Queue()\n",
"\n",
"def sum_multiprocess():\n",
" \n",
" step = int(SUM_TO / N_THINGS)\n",
"\n",
" def sum_to_queue(end, start):\n",
" '''Helper function\n",
" to put result in queue'''\n",
" q.put(sum_range(end, start))\n",
" \n",
" processes = []\n",
" for ii in range(N_THINGS):\n",
" start = ii * step\n",
" end = (ii + 1) * step\n",
" p = multiprocessing.Process(\n",
" target=sum_to_queue,\n",
" args=(end, start)\n",
" )\n",
" p.start()\n",
" processes.append(p)\n",
" \n",
" # ensure function only returns once all threads completed\n",
" for p in processes:\n",
" p.join()\n",
"\n",
"sum_multiprocess()\n",
"\n",
"# there will be exactly N_THINGS items in queue\n",
"sum([q.get() for _ in range(N_THINGS)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Process pool:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Summing from 0 to 5000000\n",
"Summing from 5000000 to 10000000\n",
"Summing from 20000000 to 25000000\n",
"Summing from 25000000 to 30000000\n",
"Summing from 15000000 to 20000000\n",
"Summing from 35000000 to 40000000\n",
"Summing from 10000000 to 15000000\n",
"Summing from 30000000 to 35000000\n",
"Summing from 40000000 to 45000000\n",
"Summing from 45000000 to 50000000\n",
"CPU times: user 20 ms, sys: 60 ms, total: 80 ms\n",
"Wall time: 2.29 s\n"
]
},
{
"data": {
"text/plain": [
"1249999975000000"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"import multiprocessing\n",
"\n",
"# Queues are thread-safe objects, i.e. it implements the\n",
"# required locking semantics such that communication can\n",
"# occur safely between threads. It operates under the\n",
"# FIFO (first in, first out) principle\n",
"from multiprocessing import Queue\n",
"q = Queue()\n",
"\n",
"# Note that target function cannot be nested'\n",
"# lambda function as before\n",
"def get_sum(x):\n",
" return sum_range(*x)\n",
"\n",
"def sum_pool():\n",
" \n",
" step = int(SUM_TO / N_THINGS)\n",
"\n",
" ranges = []\n",
" for ii in range(N_THINGS):\n",
" start = ii * step\n",
" end = (ii + 1) * step\n",
" ranges.append((end, start))\n",
" \n",
" # uses 8 threads\n",
" with multiprocessing.Pool(processes=8) as pool:\n",
" # returns a generator\n",
" results = pool.map(\n",
" get_sum,\n",
" ranges\n",
" ) \n",
"\n",
" return sum(list(results))\n",
"\n",
"sum_pool()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "py37",
"language": "python",
"name": "py37"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment