Last active
September 13, 2015 01:45
-
-
Save CamDavidsonPilon/9a5c59c69c87ca65eaba to your computer and use it in GitHub Desktop.
pyspark inverse index generator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
documents = sc.parallelize([ | |
('0', "frequency: the frequency vector of customers' purchases (denoted x in literature)."), | |
('1', "recency: the recency vector of customers' purchases (denoted t_x in literature)."), | |
('2', "T: the vector of customers' age (time since first purchase)"), | |
('3', 'iterative_fitting: perform `iterative_fitting` additional fits to find the best'), | |
('4', 'parameters for the model. Setting to 0 will improve peformance but possibly'), | |
('5', 'hurt estimates.'), | |
('6', 'initial_params: set initial params for the iterative fitter.'), | |
('7', 'verbose: set to true to print out convergence diagnostics.'), | |
]) | |
def inverse_index(rdd): | |
def parse_documents((key, document)): | |
for word in document.split(): | |
yield (word, key) | |
def add_sets(s1,s2): | |
return s1.union(s2) | |
def add_value(s, v): | |
s.update([v]) | |
return s | |
return rdd.flatMap(parse_documents)\ | |
.combineByKey(set, add_value, add_sets)\ | |
.collectAsMap() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment