Skip to content

Instantly share code, notes, and snippets.

@thvasilo
Created October 18, 2018 16:23
Show Gist options
  • Save thvasilo/8bce1831e817346f390f330559f4c417 to your computer and use it in GitHub Desktop.
Save thvasilo/8bce1831e817346f390f330559f4c417 to your computer and use it in GitHub Desktop.
An example script to process a text file in parallel using Python
import argparse
import multiprocessing as mp
import os
from operator import itemgetter
from collections import Counter
import functools
import json
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", action='store_true', default=False)
parser.add_argument("--no-stdout", action='store_true', default=False)
parser.add_argument("--cores", type=int, default=None)
return parser.parse_args()
def parse_libsvm_line(line: str) -> list:
"""
Parses a line in a LibSVM file to return the indexes of non-zero features
:param line: A line in LibSVM format: "1 5:22 7:44 99:0.88"
:return: A list of ints, one for each index appearing in the line
"""
features = line.split()[1:] # Get rid of the class value
indexes = [int(pair.split(":")[0]) for pair in features]
return indexes
def process_wrapper(arg_tuple):
"""
Applies the process function to every line in a chunk of a file, to determine the frequency
of features in the chunk.
:param arg_tuple: A tuple that contains: line_process_fun, filename, chunk_start, chunk_size
:return: A counter object that counts the frequency of each feature in the chunk
"""
line_process_fun, filename, chunk_start, chunk_size = arg_tuple
counter = Counter()
with open(filename) as f:
f.seek(chunk_start)
lines = f.read(chunk_size).splitlines()
for line in lines:
indexes = line_process_fun(line)
for index in indexes:
counter[index] += 1
return counter
def chunkify(fname, size=1024*1024):
"""
Creates a generator that indicates how to chunk a file into parts.
:param fname: The name of the file to be chunked
:param size: The size of each chunk, in bytes.
:return: A generator of (chunk_start, chunk_size) tuples for the file.
"""
file_end = os.path.getsize(fname)
with open(fname, 'r') as f:
chunk_end = f.tell()
while True:
chunk_start = chunk_end
f.seek(f.tell() + size, os.SEEK_SET)
f.readline()
chunk_end = f.tell()
yield chunk_start, chunk_end - chunk_start
if chunk_end > file_end:
break
if __name__ == '__main__':
args = parse_args()
pool = mp.Pool(args.cores)
jobs = []
# Create one job argument tuple for each chunk of the file
for chunk_start, chunk_size in chunkify(args.input):
jobs.append((parse_libsvm_line, args.input, chunk_start, chunk_size))
# Process chunks in parallel. The result is a list of Counter objects
res_list = pool.map(process_wrapper, jobs)
# Aggregate the chunk dictionaries and sort by decreasing value
aggregated_count = sorted(functools.reduce(lambda a, b: a + b, res_list).items(),
key=itemgetter(1), reverse=True)
# Print the result
if not args.no_stdout:
print(aggregated_count)
# Write the result to a file as json (sorted list of [index, count] lists)
if args.output:
with open(args.input + "_frequencies.json", 'w') as out:
json.dump(aggregated_count, out)
# Close the pool workers
pool.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment