public
Created

Wrapper around elastic-mapreduce to make it easier to use

  • Download Gist
emr.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
#!/usr/bin/env python
 
EMR_COMMAND = os.path.expanduser('~/elastic-mapreduce/elastic-mapreduce')
EMR_LOGGING_DIR = "s3://songkick/emr-logs"
 
def create_pig_job_flow(pigscript, num_instances=1, extraArguments=[]):
jobname = "Pig_Daily_" + datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
print "Creating pig job flow", jobname, pigscript
args = [EMR_COMMAND,
"--create",
"--name", jobname,
"--num-instances", str(num_instances),
"--plain-output",
"--log-uri", "%s/%s" % (EMR_LOGGING_DIR, jobname),
"--pig-script", str(pigscript)]
for extraArgument in extraArguments:
args.append("--arg")
args.append(str(extraArgument))
jobid = subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0].strip()
print "Job Id:", jobid
return jobid
 
def get_job_description(jobid):
output = subprocess.Popen([EMR_COMMAND, "--describe", "-j", jobid], stdout=subprocess.PIPE).communicate()[0]
job_description = json.loads(output)['JobFlows'][0]
return job_description
 
def wait_for_job_completion(jobid):
running = True
state = None
while running:
job_description = get_job_description(jobid)
status_detail = job_description['ExecutionStatusDetail']
if state != status_detail['State']:
state = status_detail['State']
print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "state:", state
 
if status_detail['EndDateTime'] is not None:
running = False
else:
time.sleep(30)
 
def check_step_status(jobid):
job_description = get_job_description(jobid)
print
print 'Steps:'
failed = False
for step in job_description['Steps']:
step_name = step['StepConfig']['Name']
step_state = step['ExecutionStatusDetail']['State']
if step_state == 'FAILED' or step_state == 'CANCELLED':
failed = True
print '-',step_name,'\t',step_state
if failed:
sys.exit(1)
 
if __name__ == '__main__':
pigscript = sys.argv[1]
if len(sys.argv) > 2:
args = sys.argv[2:]
else:
args = []
jobid = create_pig_job_flow(pigscript, extraArguments=args)
wait_for_job_completion(jobid)
check_step_status(jobid)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.