Skip to content

Instantly share code, notes, and snippets.

@bradmontgomery
Created August 21, 2018 02:38
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bradmontgomery/c514f243406459477b9bf05fa7201ef8 to your computer and use it in GitHub Desktop.
Save bradmontgomery/c514f243406459477b9bf05fa7201ef8 to your computer and use it in GitHub Desktop.
Examples of using concurrent.futures with Threads & Processes and the joblib library to read data from a bunch of files & do some simple parallell processing.
#!/usr/bin/env python
"""
process some data
"""
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from joblib import Parallel, delayed
import os
import statistics
import sys
import time
def process_file(filename):
"""Given a file name (or a path to a csv file), open it, read it's contents
and calculate the mean value for every row of data.
Return a list of mean values (one per row).
"""
# TODO: it'd probably be a good idea to use the csv module for this.
# Check out Douglas' Pluralsight course on using the python standard library
# to learn more about the csv module!
results = []
with open(filename) as f:
for line in f.readlines():
values = line.strip().split(",")
values = [int(v) for v in values]
results.append(statistics.mean(values))
return results
if __name__ == "__main__":
t1 = time.time()
files = sorted([f"data/{file}" for file in os.listdir("data/")])
print(f"Found {len(files)} files to process...")
# The following code doesn't use any parallel processing. It will process
# each file, one at a time.
for i, result in enumerate(map(process_file, files)):
print(f"Processed {files[i]}...")
# The following code will use Threads, running the `process_file` function
# in multiple threads.
# with ThreadPoolExecutor() as executor:
# for i, result in enumerate(executor.map(process_file, files)):
# print(f"Processed {files[i]}...")
# Uncomment the following to execute the same function using multiple
# processes. This makes use of python's multiprocessing module.
# with ProcessPoolExecutor() as executor:
# for i, result in enumerate(executor.map(process_file, files)):
# print(f"Processed {files[i]}...")
# This code uses Joblib (pip install joblib) See: https://joblib.readthedocs.io/
# processors = Parallel(n_jobs=4)
# params = (delayed(process_file)(file) for file in files)
# jobs = processors(params)
# for i, result in enumerate(jobs):
# print(f"Processed {files[i]}...")
t2 = time.time()
elapsed = round(t2 - t1, 4)
print(f"Processed data in {elapsed} seconds.")
#!/usr/bin/env python
"""
Generate some dummy data in some files.
"""
import os
import random
import time
def create_data_file(filename, num_rows=1000, num_columns=10):
"""
Create a CSV file with `num_rows` and `num_columns` containing random
integers from 1 - 100.
"""
# TODO: it'd probably be a good idea to use the csv module for this.
# Check out Douglas' Pluralsight course on using the python standard library
# to learn more about the csv module!
with open(filename, "w") as f:
for _ in range(num_rows):
values = [str(random.randint(1, 100)) for _ in range(num_columns)]
values = ",".join(values)
f.write(values + "\n")
if __name__ == "__main__":
num_files = int(input("How many data files do you want? "))
t1 = time.time()
# Make a data/ directory if it doesn't exist...
if not os.path.exists('data'):
os.mkdir('data')
# Create a list of data file names...
files = [f"data/data_{i}.csv" for i in range(num_files)]
for file in files:
create_data_file(file)
t2 = time.time()
elapsed = round(t2 - t1, 2)
print(f"Generated data in {elapsed} seconds.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment