Skip to content

Instantly share code, notes, and snippets.

@robinovitch61
Last active October 30, 2019 00:25
Show Gist options
  • Save robinovitch61/e9f94dabc8d46b269a279759676596a6 to your computer and use it in GitHub Desktop.
Save robinovitch61/e9f94dabc8d46b269a279759676596a6 to your computer and use it in GitHub Desktop.
Multithreading in Python
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Threading and the CPython Global Interpreter Lock (GIL)\n",
"***\n",
"***"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Threads maintain control during CPU operations (compute)\n",
"* The GIL locks threads that are doing CPU intensive operations"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Threads cede control during IO\n",
"* [The GIL does not lock operations that don't need to call back to the CPython runtime](https://stackoverflow.com/questions/29270818/why-is-a-python-i-o-bound-task-not-blocked-by-the-gil)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"***\n",
"## Compute with Threading Degrades Performance"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Take some function that sums all the numbers between start and end:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def sum_range(end, start=0, add_to_queue=True):\n",
" print(\"Summing from {} to {}\".format(str(start), str(end)))\n",
" assert start < end\n",
"\n",
" total = 0\n",
" count = start\n",
" while count < end:\n",
" total += count\n",
" count += 1\n",
" return total"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And let's choose some arbitrary large number to sum to and some number of threads:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"SUM_TO = int(5e7)\n",
"N_THREADS = 10"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The single threaded approach is simple:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Summing from 0 to 50000000\n",
"CPU times: user 5.68 s, sys: 10 ms, total: 5.69 s\n",
"Wall time: 5.71 s\n"
]
},
{
"data": {
"text/plain": [
"1249999975000000"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"sum_range(SUM_TO)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**The threaded approach is a bit slower, as GIL doesn't allow parallel threading and has overhead. It actually gets worse with more threads!**"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Summing from 0 to 5000000\n",
"Summing from 5000000 to 10000000\n",
"Summing from 10000000 to 15000000\n",
"Summing from 15000000 to 20000000\n",
"Summing from 20000000 to 25000000Summing from 25000000 to 30000000\n",
"\n",
"Summing from 30000000 to 35000000\n",
"Summing from 35000000 to 40000000\n",
"Summing from 40000000 to 45000000\n",
"Summing from 45000000 to 50000000\n",
"CPU times: user 6.31 s, sys: 120 ms, total: 6.43 s\n",
"Wall time: 6.18 s\n"
]
},
{
"data": {
"text/plain": [
"1249999975000000"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"import threading\n",
"\n",
"# Queues are thread-safe objects, i.e. it implements the\n",
"# required locking semantics such that communication can\n",
"# occur safely between threads. It operates under the\n",
"# FIFO (first in, first out) principle\n",
"from queue import Queue\n",
"q = Queue()\n",
"\n",
"def sum_threaded():\n",
" \n",
" step = int(SUM_TO / N_THREADS)\n",
"\n",
" def sum_to_queue(end, start):\n",
" '''Helper function\n",
" to put result in queue'''\n",
" q.put(sum_range(end, start))\n",
" \n",
" threads = []\n",
" for ii in range(N_THREADS):\n",
" start = ii * step\n",
" end = (ii + 1) * step\n",
" t = threading.Thread(\n",
" target=sum_to_queue,\n",
" args=(end, start)\n",
" )\n",
" t.start()\n",
" threads.append(t)\n",
" \n",
" # ensure function only returns once all threads completed\n",
" for t in threads:\n",
" t.join()\n",
"\n",
"sum_threaded()\n",
"\n",
"# there will be exactly N_THREADS items in queue\n",
"sum([q.get() for _ in range(N_THREADS)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another way to do threading using concurrent.futures.thread.ThreadPoolExecutor is equivalent to the above threaded approach, but allows you to set the number of threads independently of computations you want to perform. This can be helpful if you don't want to, say, start 10,000 threads for a single process:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Summing from 0 to 5000000\n",
"Summing from 5000000 to 10000000Summing from 10000000 to 15000000\n",
"\n",
"Summing from 15000000 to 20000000\n",
"Summing from 20000000 to 25000000\n",
"Summing from 25000000 to 30000000\n",
"Summing from 30000000 to 35000000\n",
"Summing from 35000000 to 40000000\n",
"Summing from 40000000 to 45000000\n",
"Summing from 45000000 to 50000000\n",
"CPU times: user 6.69 s, sys: 130 ms, total: 6.82 s\n",
"Wall time: 6.63 s\n"
]
},
{
"data": {
"text/plain": [
"1249999975000000"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"from concurrent.futures import ThreadPoolExecutor\n",
"\n",
"# Queues are thread-safe objects, i.e. it implements the\n",
"# required locking semantics such that communication can\n",
"# occur safely between threads. It operates under the\n",
"# FIFO (first in, first out) principle\n",
"from queue import Queue\n",
"q = Queue()\n",
"\n",
"def sum_threaded2():\n",
" \n",
" step = int(SUM_TO / N_THREADS)\n",
"\n",
" ranges = []\n",
" for ii in range(N_THREADS):\n",
" start = ii * step\n",
" end = (ii + 1) * step\n",
" ranges.append((end, start))\n",
" \n",
" # uses 8 threads\n",
" with ThreadPoolExecutor(8) as executor:\n",
" # returns a generator\n",
" results = executor.map(\n",
" lambda x: sum_range(*x),\n",
" ranges\n",
" ) \n",
"\n",
" return sum(list(results))\n",
"\n",
"sum_threaded2()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"***\n",
"## IO with Threading Improves Performance"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's get all the current internal URLs from leorobinovitch.com:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['http://leorobinovitch.com/',\n",
" 'http://leorobinovitch.com/2019/09/25/recurse-00-(it-begins).html',\n",
" 'http://leorobinovitch.com/2019/09/27/jupyter-with-docker.html',\n",
" 'http://leorobinovitch.com/2019/10/04/recurse-01-(logic-immunity).html',\n",
" 'http://leorobinovitch.com/2019/10/11/recurse-02-(R-wavey).html',\n",
" 'http://leorobinovitch.com/2019/10/18/linear-regression-theory.html',\n",
" 'http://leorobinovitch.com/2019/10/18/recurse-03-(regressing-linearly).html',\n",
" 'http://leorobinovitch.com/2019/10/25/statistical-bias.html',\n",
" 'http://leorobinovitch.com/2019/10/26/recurse-04-(all-the-wrong-type).html',\n",
" 'http://leorobinovitch.com/about.html',\n",
" 'http://leorobinovitch.com/resume.html']"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import urllib\n",
"# pip install BeautifulSoup4 if not available\n",
"from bs4 import BeautifulSoup\n",
"\n",
"def get_html(url):\n",
" return str(urllib.request.urlopen(url).read(), 'utf-8')\n",
"\n",
"def get_all_urls(url, require_in_url=None):\n",
" all_urls = []\n",
"\n",
" page = get_html(url)\n",
" soup = BeautifulSoup(page)\n",
" links = soup.findAll('a', href=True)\n",
" \n",
" for anchor in links:\n",
" this_url = anchor['href']\n",
" if anchor['href'].strip()[:4] != 'http':\n",
" this_url = \"\".join([url, anchor['href']])\n",
" \n",
" if require_in_url and require_in_url not in this_url:\n",
" continue\n",
" all_urls.append(this_url)\n",
"\n",
" return sorted(set(all_urls))\n",
"\n",
"urls = get_all_urls(\n",
" 'http://leorobinovitch.com',\n",
" require_in_url=\"leorobinovitch\"\n",
")\n",
"\n",
"urls"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define a function that gets raw HTML from a URL:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def get_html(url):\n",
" import urllib\n",
" return urllib.request.urlopen(url).read()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A single threaded approach to getting the raw HTML from all pages:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 120 ms, sys: 20 ms, total: 140 ms\n",
"Wall time: 484 ms\n"
]
}
],
"source": [
"%%time\n",
"results_unthreaded = []\n",
"for url in urls:\n",
" results_unthreaded.append(get_html(url))\n",
"\n",
"# include a sort for comparison later\n",
"results_unthreaded = sorted(results_unthreaded)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A threaded approach here is faster, as the GIL is not holding the lock on IO operations:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 140 ms, sys: 20 ms, total: 160 ms\n",
"Wall time: 210 ms\n"
]
}
],
"source": [
"%%time\n",
"import threading\n",
"\n",
"# Queues are thread-safe objects, i.e. it implements the\n",
"# required locking semantics such that communication can\n",
"# occur safely between threads. It operates under the\n",
"# FIFO (first in, first out) principle\n",
"from queue import Queue\n",
"q = Queue()\n",
"\n",
"# helper function to put results in queue\n",
"def put_html_in_queue(url):\n",
" q.put(get_html(url))\n",
"\n",
"threads = []\n",
"for url in urls: # one thread per URL\n",
" t = threading.Thread(target=put_html_in_queue, args=(url,))\n",
" t.start()\n",
" threads.append(t)\n",
"\n",
"# ensure program continues only once all threads are completed\n",
"for t in threads:\n",
" t.join()\n",
" \n",
"# include a sort for comparison later\n",
"results1_threaded = sorted([q.get() for i in range(len(urls))])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another way to do threading using concurrent.futures.thread.ThreadPoolExecutor is equivalent to the above threaded approach, but allows you to set the number of threads independently of computations you want to perform. This can be helpful if you don't want to, say, start 10,000 threads for a single process:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 210 ms, sys: 90 ms, total: 300 ms\n",
"Wall time: 322 ms\n"
]
}
],
"source": [
"%%time\n",
"from concurrent.futures import ThreadPoolExecutor\n",
"\n",
"# uses 8 threads\n",
"with ThreadPoolExecutor(8) as executor:\n",
" # returns a generator\n",
" results = executor.map(get_html, urls)\n",
"\n",
"# include a sort for comparison later\n",
"results2_threaded = sorted(list(results))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Proof that all (sorted) results are equivalent:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"results_unthreaded == results1_threaded == results2_threaded"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "py37",
"language": "python",
"name": "py37"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment