Skip to content

Instantly share code, notes, and snippets.

@dfdeshom
Last active February 27, 2018 20:14
Show Gist options
  • Save dfdeshom/3865731e3fe36b79c7a1f516ddef9cc1 to your computer and use it in GitHub Desktop.
Save dfdeshom/3865731e3fe36b79c7a1f516ddef9cc1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2.7
import json
from mrjob.job import MRJob
import random
class TestStdIn(MRJob):
@staticmethod
def candidate_key(doc):
return doc.get('candidate_id')
@staticmethod
def candidate_resume_key(doc):
return doc['candidate_id'], doc['resume_id']
def most_recent_docs(self, rdd):
tc = 'time_created'
return rdd.reduceByKey(lambda x, y: x if x[tc] >= y[tc] else y)
def merge_candidate_resume(self, candidate_id, candidate, resume):
if not candidate or not resume:
return
return candidate
def spark(self, input_path, output_path):
# self.stdin = None
# self.stdout = None
import pyspark
spark = (pyspark.sql.SparkSession.builder
.appName(self.__class__.__name__)
.config("spark.scheduler.mode", "FAIR").getOrCreate())
sc = spark.sparkContext
resumes0 = [{'xml':'xml','time_created':1.0,
'candidate_id':11, 'resume_id':random.randint(0,1)}] * 50
candidates0 = [{'time_created':1.0, 'candidate_id':11, }] * 50
resumes = (self.most_recent_docs(sc.parallelize(resumes0)
.keyBy(self.candidate_resume_key))
.values()
.keyBy(self.candidate_key)
)
candidates = (self.most_recent_docs(sc.parallelize(candidates0)
.keyBy(self.candidate_key))
)
candidate_resume = (candidates.join(resumes)
.map(lambda (candidate_id, (candidate, resume)):
self.merge_candidate_resume(candidate_id,
candidate,
resume))
.filter(lambda x: x)
#.coalesce(500)
)
candidate_resume.cache()
print candidate_resume.take(3)
sc.stop()
if __name__ == "__main__":
TestStdIn.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment