Skip to content

Instantly share code, notes, and snippets.

@shawnemhe
Created October 2, 2018 16:41
Show Gist options
  • Save shawnemhe/e8855c9f60434d0475f6f07f976e8e71 to your computer and use it in GitHub Desktop.
Save shawnemhe/e8855c9f60434d0475f6f07f976e8e71 to your computer and use it in GitHub Desktop.
Using multithreading and multiprocessing to download data files and preprocess them with Pandas
#!/usr/bin/python
"""Downloads and aggregates data files using multiprocessing
This example was taken from project 6 of my Udacity Nanodegree
https://shawnemhe.github.io/udacity-data-analyst/
As a primer, multithreading is well suited to IO tasks, whereas multiprocessing works well
for CPU intensive tasks. The bottlenecks for IO tasks are in the network and disk operations.
Multithreading allows a single process to download multiple files concurrently by not waiting
for each IO tasks to complete before launching the next. However, a task like aggregating
data depends on having dedicated processing power. Even with multiple files, when restricted to
a single processor, each file must be aggregated one at a time. Multiprocessing allows the data to
be divided among processors so that multiple files can be aggregated at the same time.
The target data files are stored at http://stat-computing.org/dataexpo/2009/the-data.html
in csv format and compressed using bzip2. Multithreading is used to download all 22 files
concurrently. As each file is downloaded it is passed to a process executor to calculate the
aggregation data using multiprocessing.
"""
import concurrent.futures
import requests
import os
import pandas as pd
import pickle
URLS = [f'http://stat-computing.org/dataexpo/2009/{year}.csv.bz2'
for year in range(1987, 2009)]
# Define features to load when reading the CSVs into pandas
FEATURES_OF_INTEREST = [
'Year',
'UniqueCarrier',
'DepDelay'
]
# Features to group by
GROUP_BY = ['Year', 'UniqueCarrier']
# Mapping of aggregation functions to features
AGG_MAP = {
'DepDelay': lambda x: (x>15).sum()/len(x)
}
with open('carrier_dict.pkl', 'rb') as file:
CARRIER_MAP = pickle.load(file)
def load_url(url, process_executor):
"""Downloads URL if the file does not already exist
Passes filename to aggregation function via a process executor for conccurent aggregation.
Returns a reference to the future object created by the process executor
"""
filename = url.split('/')[-1]
if not os.path.exists(filename):
print(f'Downloading {url}')
r = requests.get(url)
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=128):
f.write(chunk)
print(f'Downloaded {url}. Passing to aggregator.')
else:
print(f'{filename} already exists. Passing to aggregator.')
return process_executor.submit(aggregate_data, filename)
def aggregate_data(filename):
"""Aggregate data from raw file
The files are stored in bz2 format. Pandas is used to decompress the file, extract
FEATURES_OF_INTEREST and calculate aggregated information.
"""
print(f'Processing {filename}')
# Unzip and read the csv
df = pd.read_csv(filename, usecols=FEATURES_OF_INTEREST, compression='bz2')
# Convert the carrier codes to their full names
df['UniqueCarrier'] = df['UniqueCarrier'].map(CARRIER_MAP)
# Aggregate and reduce the dataframe
df = df.groupby(GROUP_BY).agg(AGG_MAP)
# df.set_index(GROUP_BY, inplace=True)
print(f'Processed: {filename}')
return df
if __name__ == '__main__':
# Verify feature mapping variables are correct before beginning
for feature in GROUP_BY:
assert feature in FEATURES_OF_INTEREST, f'Unexpected feature {feature} in group_by.'
for feature in AGG_MAP.keys():
assert feature in FEATURES_OF_INTEREST, f'Unexpected feature {feature} in aggregation map.'
# This is where all the concurrent processes and threads are created
with concurrent.futures.ProcessPoolExecutor() as pe, concurrent.futures.ThreadPoolExecutor() as te:
# First, the ThreadPoolExecutor is used to download each file.
future_url_request = [te.submit(load_url, url, pe) for url in URLS]
# As each download thread completes it returns a Future object from the process executor
processes = []
for future in concurrent.futures.as_completed(future_url_request):
processes.append(future.result())
# As each process completes it returns and aggregated dataframe
aggregated_data = []
for future in concurrent.futures.as_completed(processes):
aggregated_data.append(future.result())
# Finally, the dataframes are concatenated
results = pd.concat(aggregated_data)
# Resetting the index converts the result to tidy data format
results.reset_index(inplace=True)
results.to_csv('flight_data.csv')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment