Created
October 2, 2018 16:41
-
-
Save shawnemhe/e8855c9f60434d0475f6f07f976e8e71 to your computer and use it in GitHub Desktop.
Using multithreading and multiprocessing to download data files and preprocess them with Pandas
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
"""Downloads and aggregates data files using multiprocessing | |
This example was taken from project 6 of my Udacity Nanodegree | |
https://shawnemhe.github.io/udacity-data-analyst/ | |
As a primer, multithreading is well suited to IO tasks, whereas multiprocessing works well | |
for CPU intensive tasks. The bottlenecks for IO tasks are in the network and disk operations. | |
Multithreading allows a single process to download multiple files concurrently by not waiting | |
for each IO tasks to complete before launching the next. However, a task like aggregating | |
data depends on having dedicated processing power. Even with multiple files, when restricted to | |
a single processor, each file must be aggregated one at a time. Multiprocessing allows the data to | |
be divided among processors so that multiple files can be aggregated at the same time. | |
The target data files are stored at http://stat-computing.org/dataexpo/2009/the-data.html | |
in csv format and compressed using bzip2. Multithreading is used to download all 22 files | |
concurrently. As each file is downloaded it is passed to a process executor to calculate the | |
aggregation data using multiprocessing. | |
""" | |
import concurrent.futures | |
import requests | |
import os | |
import pandas as pd | |
import pickle | |
URLS = [f'http://stat-computing.org/dataexpo/2009/{year}.csv.bz2' | |
for year in range(1987, 2009)] | |
# Define features to load when reading the CSVs into pandas | |
FEATURES_OF_INTEREST = [ | |
'Year', | |
'UniqueCarrier', | |
'DepDelay' | |
] | |
# Features to group by | |
GROUP_BY = ['Year', 'UniqueCarrier'] | |
# Mapping of aggregation functions to features | |
AGG_MAP = { | |
'DepDelay': lambda x: (x>15).sum()/len(x) | |
} | |
with open('carrier_dict.pkl', 'rb') as file: | |
CARRIER_MAP = pickle.load(file) | |
def load_url(url, process_executor): | |
"""Downloads URL if the file does not already exist | |
Passes filename to aggregation function via a process executor for conccurent aggregation. | |
Returns a reference to the future object created by the process executor | |
""" | |
filename = url.split('/')[-1] | |
if not os.path.exists(filename): | |
print(f'Downloading {url}') | |
r = requests.get(url) | |
with open(filename, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=128): | |
f.write(chunk) | |
print(f'Downloaded {url}. Passing to aggregator.') | |
else: | |
print(f'{filename} already exists. Passing to aggregator.') | |
return process_executor.submit(aggregate_data, filename) | |
def aggregate_data(filename): | |
"""Aggregate data from raw file | |
The files are stored in bz2 format. Pandas is used to decompress the file, extract | |
FEATURES_OF_INTEREST and calculate aggregated information. | |
""" | |
print(f'Processing {filename}') | |
# Unzip and read the csv | |
df = pd.read_csv(filename, usecols=FEATURES_OF_INTEREST, compression='bz2') | |
# Convert the carrier codes to their full names | |
df['UniqueCarrier'] = df['UniqueCarrier'].map(CARRIER_MAP) | |
# Aggregate and reduce the dataframe | |
df = df.groupby(GROUP_BY).agg(AGG_MAP) | |
# df.set_index(GROUP_BY, inplace=True) | |
print(f'Processed: {filename}') | |
return df | |
if __name__ == '__main__': | |
# Verify feature mapping variables are correct before beginning | |
for feature in GROUP_BY: | |
assert feature in FEATURES_OF_INTEREST, f'Unexpected feature {feature} in group_by.' | |
for feature in AGG_MAP.keys(): | |
assert feature in FEATURES_OF_INTEREST, f'Unexpected feature {feature} in aggregation map.' | |
# This is where all the concurrent processes and threads are created | |
with concurrent.futures.ProcessPoolExecutor() as pe, concurrent.futures.ThreadPoolExecutor() as te: | |
# First, the ThreadPoolExecutor is used to download each file. | |
future_url_request = [te.submit(load_url, url, pe) for url in URLS] | |
# As each download thread completes it returns a Future object from the process executor | |
processes = [] | |
for future in concurrent.futures.as_completed(future_url_request): | |
processes.append(future.result()) | |
# As each process completes it returns and aggregated dataframe | |
aggregated_data = [] | |
for future in concurrent.futures.as_completed(processes): | |
aggregated_data.append(future.result()) | |
# Finally, the dataframes are concatenated | |
results = pd.concat(aggregated_data) | |
# Resetting the index converts the result to tidy data format | |
results.reset_index(inplace=True) | |
results.to_csv('flight_data.csv') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment