Skip to content

Instantly share code, notes, and snippets.

@boxysean
Last active February 16, 2020 19:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save boxysean/3ed325ebb75db0303002f9484821e553 to your computer and use it in GitHub Desktop.
Save boxysean/3ed325ebb75db0303002f9484821e553 to your computer and use it in GitHub Desktop.
Four approaches to multi-threaded extract-and-load code (including a new one)
import aiohttp
import asyncio
import sqlite3
URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/',
]
async def extract_and_load(url, timeout=30):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response:
web_result = await response.text()
print(f"{url} is {len(web_result)} bytes")
with sqlite3.connect('example.db') as conn, conn as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);')
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result)))
except Exception as e:
print(f"{url} generated an exception: {e}")
return False
else:
return True
async def main():
succeeded = await asyncio.gather(*[
extract_and_load(url)
for url in URLS
])
print(f"Successfully completed {sum(1 for result in succeeded if result)}")
if __name__ == '__main__':
asyncio.run(main())
"""
The concurrent.futures solution, adapted from the Web Crawl example in
[PEP-3148](https://www.python.org/dev/peps/pep-3148/#id13).
"""
import concurrent.futures
import requests
import sqlite3
URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/',
]
def extract_and_load(url, timeout=30):
try:
web_result = requests.get(url, timeout=timeout).text
print(f"{url} is {len(web_result)} bytes")
with sqlite3.connect('example.db') as conn, conn as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);')
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result)))
except Exception as e:
print(f"{url} generated an exception: {e}")
return False
else:
return True
def main():
succeeded = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = dict(
(executor.submit(extract_and_load, url), url)
for url in URLS
)
for future in concurrent.futures.as_completed(future_to_url):
succeeded.append(future.result())
print(f"Successfully completed {sum(1 for result in succeeded if result)}")
if __name__ == '__main__':
main()
"""
My proposed solution.
The new interfaces I have defined elsewhere includes:
- a context manager `pylateral.task_pool` to provide a multi-threaded
worker pool (backed by `concurrent.futures.ThreadPoolContext`)
- a decorator `@pylateral.task` to indicate a function must be run on
the worker pool (if exists)
"""
import requests
import sqlite3
import pylateral
URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/',
]
@pylateral.task
def extract_and_load(url, timeout=30):
try:
web_result = requests.get(url, timeout=timeout).text
print(f"{url} is {len(web_result)} bytes")
with sqlite3.connect('example.db') as conn, conn as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);')
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result)))
except Exception as e:
print(f"{url} generated an exception: {e}")
return False
else:
return True
def main():
# Provide a multithreaded worker pool to all code executing in this
# context. The __exit__ method waits for all workers to complete
# all tasks.
with pylateral.task_pool(max_workers=5) as manager:
for url in URLS:
# The return value is gobbled up by the @parallel.task
# decorator and redirected to `manager.results`. Since this
# runs asynchronously, we can't really get the result in
# this main execution thread. (If we were using `asyncio`,
# we could use `await` here.)
extract_and_load(url)
# An unordered list of results from the @parallel.task function
# calls.
succeeded = manager.results
print(f"Successfully completed {sum(1 for result in succeeded if result)}")
if __name__ == '__main__':
main()
"""
Style of glue code I inherited.
"""
import requests
import sqlite3
URLS = [
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/',
]
def extract_and_load(url, timeout=30):
try:
web_result = requests.get(url, timeout=timeout).text
print(f"{url} is {len(web_result)} bytes")
with sqlite3.connect('example.db') as conn, conn as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);')
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result)))
except Exception as e:
print(f"{url} generated an exception: {e}")
return False
else:
return True
def main():
succeeded = []
for url in URLS:
succeeded.append(extract_and_load(url))
print(f"Successfully completed {sum(1 for result in succeeded if result)}")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment