Using multithreading and multiprocessing to download data files and preprocess them with Pandas
"""Downloads and aggregates data files using multiprocessing
This example was taken from project 6 of my Udacity Nanodegree
As a primer, multithreading is well suited to IO tasks, whereas multiprocessing works well
for CPU intensive tasks. The bottlenecks for IO tasks are in the network and disk operations.
Multithreading allows a single process to download multiple files concurrently by not waiting
for each IO tasks to complete before launching the next. However, a task like aggregating
data depends on having dedicated processing power. Even with multiple files, when restricted to
a single processor, each file must be aggregated one at a time. Multiprocessing allows the data to
be divided among processors so that multiple files can be aggregated at the same time.
The target data files are stored at
in csv format and compressed using bzip2. Multithreading is used to download all 22 files
concurrently. As each file is downloaded it is passed to a process executor to calculate the
aggregation data using multiprocessing.
import concurrent.futures
import requests
import os
import pandas as pd
import pickle
URLS = [f'{year}.csv.bz2'
for year in range(1987, 2009)]
# Define features to load when reading the CSVs into pandas
# Features to group by
GROUP_BY = ['Year', 'UniqueCarrier']
# Mapping of aggregation functions to features
'DepDelay': lambda x: (x>15).sum()/len(x)
with open('carrier_dict.pkl', 'rb') as file:
CARRIER_MAP = pickle.load(file)
def load_url(url, process_executor):
"""Downloads URL if the file does not already exist
Passes filename to aggregation function via a process executor for conccurent aggregation.
Returns a reference to the future object created by the process executor
filename = url.split('/')[-1]
if not os.path.exists(filename):
print(f'Downloading {url}')
r = requests.get(url)
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=128):
print(f'Downloaded {url}. Passing to aggregator.')
print(f'{filename} already exists. Passing to aggregator.')
return process_executor.submit(aggregate_data, filename)
def aggregate_data(filename):
"""Aggregate data from raw file
The files are stored in bz2 format. Pandas is used to decompress the file, extract
FEATURES_OF_INTEREST and calculate aggregated information.
print(f'Processing {filename}')
# Unzip and read the csv
df = pd.read_csv(filename, usecols=FEATURES_OF_INTEREST, compression='bz2')
# Convert the carrier codes to their full names
df['UniqueCarrier'] = df['UniqueCarrier'].map(CARRIER_MAP)
# Aggregate and reduce the dataframe
df = df.groupby(GROUP_BY).agg(AGG_MAP)
# df.set_index(GROUP_BY, inplace=True)
print(f'Processed: {filename}')
return df
if __name__ == '__main__':
# Verify feature mapping variables are correct before beginning
for feature in GROUP_BY:
assert feature in FEATURES_OF_INTEREST, f'Unexpected feature {feature} in group_by.'
for feature in AGG_MAP.keys():
assert feature in FEATURES_OF_INTEREST, f'Unexpected feature {feature} in aggregation map.'
# This is where all the concurrent processes and threads are created
with concurrent.futures.ProcessPoolExecutor() as pe, concurrent.futures.ThreadPoolExecutor() as te:
# First, the ThreadPoolExecutor is used to download each file.
future_url_request = [te.submit(load_url, url, pe) for url in URLS]
# As each download thread completes it returns a Future object from the process executor
processes = []
for future in concurrent.futures.as_completed(future_url_request):
# As each process completes it returns and aggregated dataframe
aggregated_data = []
for future in concurrent.futures.as_completed(processes):
# Finally, the dataframes are concatenated
results = pd.concat(aggregated_data)
# Resetting the index converts the result to tidy data format
