Python Multiprocessing
This gist exists to keep track of modifications to code from a blog post.
The first revision of this gist is the code as it exists on the blog post at the time of this writing. Follow-up revisions are proposed improvments.
This gist exists to keep track of modifications to code from a blog post.
The first revision of this gist is the code as it exists on the blog post at the time of this writing. Follow-up revisions are proposed improvments.
from __future__ import print_function | |
from concurrent.futures import ThreadPoolExecutor | |
import logging | |
import time | |
LOG = logging.getLogger(__name__) # we'll use this later for error-reporting | |
TESTING = True | |
class FakeDiff(object): | |
""" | |
Fake diff implementation for demonstration only | |
""" | |
def __init__(self, a, b): | |
self.diffs = [] | |
def fetch(url): | |
if TESTING: | |
LOG.debug('Fetching %r using fake GET', url) | |
from time import sleep | |
from random import randint | |
sleep(randint(0, 3)) # simulate slow network | |
return { | |
'results': [{ | |
'types': {'postal_code'}, | |
'address_components': {'United States'} | |
}] | |
} | |
else: | |
import requests | |
return requests.get(url).json() | |
def diff(a, b): | |
if TESTING: | |
LOG.debug('Diffing %r with %r using fake diff', a, b) | |
return FakeDiff(a, b) | |
else: | |
from datadiff import diff | |
return diff(a, b) | |
def getzip(code): | |
try: | |
code = str(code) | |
url = "https://maps.googleapis.com/maps/api/geocode/json?address={}".format(code) | |
res = fetch(url)['results'] | |
if len(res) < 1: # Re-try | |
print("Retrying") | |
return getzip(code) | |
iszip = 'postal_code' in res[0]['types'] and "United States" in str(res[0]['address_components']) | |
except Exception: | |
# Let's not silence the traceback. This contains valuable information | |
# for debugging. Using the logging framework makes this ease. Either | |
# like this: | |
LOG.exception('In error') # high severity message including the traceback | |
# ... or like this: | |
LOG.debug('In error', exc_info=True) # Low severity, not displayed by default | |
iszip = False | |
return (code, iszip) | |
ziprange = range(94400, 94420) | |
print("Range is: %s" % len(ziprange)) # Using %-formatting is a bit cleaner | |
print("Using one thread") | |
start = time.time() | |
syncres = [getzip(c) for c in ziprange] | |
print("took %s" % (time.time() - start)) | |
print("Using multiple threads") | |
start = time.time() | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
results = executor.map(getzip, ziprange) | |
asyncres = sorted(results) | |
print("took %s" % (time.time() - start)) | |
# Make sure results are equal | |
d = diff(syncres, asyncres) | |
if len(d.diffs) > 0: | |
print("diff is") | |
print(d) | |
for r in asyncres: | |
print("Zip code {} is {} US code".format(r[0], "valid" if r[1] else "invalid")) |