Created
August 21, 2018 02:38
-
-
Save bradmontgomery/c514f243406459477b9bf05fa7201ef8 to your computer and use it in GitHub Desktop.
Examples of using concurrent.futures with Threads & Processes and the joblib library to read data from a bunch of files & do some simple parallell processing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
process some data | |
""" | |
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor | |
from joblib import Parallel, delayed | |
import os | |
import statistics | |
import sys | |
import time | |
def process_file(filename): | |
"""Given a file name (or a path to a csv file), open it, read it's contents | |
and calculate the mean value for every row of data. | |
Return a list of mean values (one per row). | |
""" | |
# TODO: it'd probably be a good idea to use the csv module for this. | |
# Check out Douglas' Pluralsight course on using the python standard library | |
# to learn more about the csv module! | |
results = [] | |
with open(filename) as f: | |
for line in f.readlines(): | |
values = line.strip().split(",") | |
values = [int(v) for v in values] | |
results.append(statistics.mean(values)) | |
return results | |
if __name__ == "__main__": | |
t1 = time.time() | |
files = sorted([f"data/{file}" for file in os.listdir("data/")]) | |
print(f"Found {len(files)} files to process...") | |
# The following code doesn't use any parallel processing. It will process | |
# each file, one at a time. | |
for i, result in enumerate(map(process_file, files)): | |
print(f"Processed {files[i]}...") | |
# The following code will use Threads, running the `process_file` function | |
# in multiple threads. | |
# with ThreadPoolExecutor() as executor: | |
# for i, result in enumerate(executor.map(process_file, files)): | |
# print(f"Processed {files[i]}...") | |
# Uncomment the following to execute the same function using multiple | |
# processes. This makes use of python's multiprocessing module. | |
# with ProcessPoolExecutor() as executor: | |
# for i, result in enumerate(executor.map(process_file, files)): | |
# print(f"Processed {files[i]}...") | |
# This code uses Joblib (pip install joblib) See: https://joblib.readthedocs.io/ | |
# processors = Parallel(n_jobs=4) | |
# params = (delayed(process_file)(file) for file in files) | |
# jobs = processors(params) | |
# for i, result in enumerate(jobs): | |
# print(f"Processed {files[i]}...") | |
t2 = time.time() | |
elapsed = round(t2 - t1, 4) | |
print(f"Processed data in {elapsed} seconds.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Generate some dummy data in some files. | |
""" | |
import os | |
import random | |
import time | |
def create_data_file(filename, num_rows=1000, num_columns=10): | |
""" | |
Create a CSV file with `num_rows` and `num_columns` containing random | |
integers from 1 - 100. | |
""" | |
# TODO: it'd probably be a good idea to use the csv module for this. | |
# Check out Douglas' Pluralsight course on using the python standard library | |
# to learn more about the csv module! | |
with open(filename, "w") as f: | |
for _ in range(num_rows): | |
values = [str(random.randint(1, 100)) for _ in range(num_columns)] | |
values = ",".join(values) | |
f.write(values + "\n") | |
if __name__ == "__main__": | |
num_files = int(input("How many data files do you want? ")) | |
t1 = time.time() | |
# Make a data/ directory if it doesn't exist... | |
if not os.path.exists('data'): | |
os.mkdir('data') | |
# Create a list of data file names... | |
files = [f"data/data_{i}.csv" for i in range(num_files)] | |
for file in files: | |
create_data_file(file) | |
t2 = time.time() | |
elapsed = round(t2 - t1, 2) | |
print(f"Generated data in {elapsed} seconds.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment