Skip to content

Instantly share code, notes, and snippets.

@CamDavidsonPilon
Last active September 13, 2015 01:45
Show Gist options
  • Save CamDavidsonPilon/9a5c59c69c87ca65eaba to your computer and use it in GitHub Desktop.
Save CamDavidsonPilon/9a5c59c69c87ca65eaba to your computer and use it in GitHub Desktop.
pyspark inverse index generator
documents = sc.parallelize([
('0', "frequency: the frequency vector of customers' purchases (denoted x in literature)."),
('1', "recency: the recency vector of customers' purchases (denoted t_x in literature)."),
('2', "T: the vector of customers' age (time since first purchase)"),
('3', 'iterative_fitting: perform `iterative_fitting` additional fits to find the best'),
('4', 'parameters for the model. Setting to 0 will improve peformance but possibly'),
('5', 'hurt estimates.'),
('6', 'initial_params: set initial params for the iterative fitter.'),
('7', 'verbose: set to true to print out convergence diagnostics.'),
])
def inverse_index(rdd):
def parse_documents((key, document)):
for word in document.split():
yield (word, key)
def add_sets(s1,s2):
return s1.union(s2)
def add_value(s, v):
s.update([v])
return s
return rdd.flatMap(parse_documents)\
.combineByKey(set, add_value, add_sets)\
.collectAsMap()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment