Skip to content

Instantly share code, notes, and snippets.

@nojhan
Last active December 15, 2015 00:29
Show Gist options
  • Save nojhan/5173639 to your computer and use it in GitHub Desktop.
Save nojhan/5173639 to your computer and use it in GitHub Desktop.
Compute the histogram of a csv file with a python parallel program
import sys
from multiprocessing import Pool
# Compute the histogram of a csv file with a python parallel program
# Input file format:
# name,count\n
# Output the frequency of "count".
# Stolen from http://mikecvet.wordpress.com/2010/07/02/parallel-mapreduce-in-python/
# Adapted to histogram computation instead of word count
# And ported to python3
"""
Given a list of tokens, return a list of tuples of
titlecased (or proper noun) tokens and a count of '1'.
Also remove any leading or trailing punctuation from
each token.
"""
def Map(L):
results = []
for w in L:
results.append ((w, 1))
return results
"""
Group the sublists of (token, 1) pairs into a term-frequency-list
map, so that the Reduce operation later can work on sorted
term counts. The returned result is a dictionary with the structure
{token : [(token, 1), ...] .. }
"""
def Partition(L):
tf = {}
for sublist in L:
for p in sublist:
# Append the tuple to the list in the map
try:
tf[p[0]].append (p)
except KeyError:
tf[p[0]] = [p]
return tf
"""
Given a (token, [(token, 1) ...]) tuple, collapse all the
count tuples from the Map operation into a single term frequency
number for this token, and return a final tuple (token, frequency).
"""
def Reduce(Mapping):
return (Mapping[0], sum(pair[1] for pair in Mapping[1]))
"""
Load the contents the file at the given
path into a big list and return it.
"""
def load(path):
stars = []
with open(path, "r") as f:
for line in f:
stars.append(int(line.split()[1]))
# Efficiently concatenate Python string objects
# return (''.join(stars)).split ()
return stars
"""
A generator function for chopping up a given list into chunks of
length n.
"""
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i+n]
if __name__ == '__main__':
if (len(sys.argv) != 3):
print("Usage: phisto file nprocs")
sys.exit(1)
nprocs=int(sys.argv[2])
print("Load file, stuff it into a string",file=sys.stderr)
stars = load (sys.argv[1])
print("Build a pool of 8 processes",file=sys.stderr)
pool = Pool(processes=nprocs)
print("Fragment the string data into 8 chunks",file=sys.stderr)
partitioned_stars = list(chunks(stars, len(stars) // nprocs))
print("Generate count tuples for title-cased tokens",file=sys.stderr)
single_count_tuples = pool.map(Map, partitioned_stars)
print("Organize the count tuples; lists of tuples by token key",file=sys.stderr)
token_to_tuples = Partition(single_count_tuples)
print("Collapse the lists of tuples into total term frequencies",file=sys.stderr)
term_frequencies = pool.map(Reduce, token_to_tuples.items())
print("Sort the term frequencies in increasing order",file=sys.stderr)
# term_frequencies.sort(key=lambda x: x[1]) # nb of projects
term_frequencies.sort(key=lambda x: x[0]) # nb of stars
for pair in term_frequencies[:20]:
print( "%i occurs %i times" % pair )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment