Created
October 18, 2018 16:23
-
-
Save thvasilo/8bce1831e817346f390f330559f4c417 to your computer and use it in GitHub Desktop.
An example script to process a text file in parallel using Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import multiprocessing as mp | |
import os | |
from operator import itemgetter | |
from collections import Counter | |
import functools | |
import json | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--input", required=True) | |
parser.add_argument("--output", action='store_true', default=False) | |
parser.add_argument("--no-stdout", action='store_true', default=False) | |
parser.add_argument("--cores", type=int, default=None) | |
return parser.parse_args() | |
def parse_libsvm_line(line: str) -> list: | |
""" | |
Parses a line in a LibSVM file to return the indexes of non-zero features | |
:param line: A line in LibSVM format: "1 5:22 7:44 99:0.88" | |
:return: A list of ints, one for each index appearing in the line | |
""" | |
features = line.split()[1:] # Get rid of the class value | |
indexes = [int(pair.split(":")[0]) for pair in features] | |
return indexes | |
def process_wrapper(arg_tuple): | |
""" | |
Applies the process function to every line in a chunk of a file, to determine the frequency | |
of features in the chunk. | |
:param arg_tuple: A tuple that contains: line_process_fun, filename, chunk_start, chunk_size | |
:return: A counter object that counts the frequency of each feature in the chunk | |
""" | |
line_process_fun, filename, chunk_start, chunk_size = arg_tuple | |
counter = Counter() | |
with open(filename) as f: | |
f.seek(chunk_start) | |
lines = f.read(chunk_size).splitlines() | |
for line in lines: | |
indexes = line_process_fun(line) | |
for index in indexes: | |
counter[index] += 1 | |
return counter | |
def chunkify(fname, size=1024*1024): | |
""" | |
Creates a generator that indicates how to chunk a file into parts. | |
:param fname: The name of the file to be chunked | |
:param size: The size of each chunk, in bytes. | |
:return: A generator of (chunk_start, chunk_size) tuples for the file. | |
""" | |
file_end = os.path.getsize(fname) | |
with open(fname, 'r') as f: | |
chunk_end = f.tell() | |
while True: | |
chunk_start = chunk_end | |
f.seek(f.tell() + size, os.SEEK_SET) | |
f.readline() | |
chunk_end = f.tell() | |
yield chunk_start, chunk_end - chunk_start | |
if chunk_end > file_end: | |
break | |
if __name__ == '__main__': | |
args = parse_args() | |
pool = mp.Pool(args.cores) | |
jobs = [] | |
# Create one job argument tuple for each chunk of the file | |
for chunk_start, chunk_size in chunkify(args.input): | |
jobs.append((parse_libsvm_line, args.input, chunk_start, chunk_size)) | |
# Process chunks in parallel. The result is a list of Counter objects | |
res_list = pool.map(process_wrapper, jobs) | |
# Aggregate the chunk dictionaries and sort by decreasing value | |
aggregated_count = sorted(functools.reduce(lambda a, b: a + b, res_list).items(), | |
key=itemgetter(1), reverse=True) | |
# Print the result | |
if not args.no_stdout: | |
print(aggregated_count) | |
# Write the result to a file as json (sorted list of [index, count] lists) | |
if args.output: | |
with open(args.input + "_frequencies.json", 'w') as out: | |
json.dump(aggregated_count, out) | |
# Close the pool workers | |
pool.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment