Skip to content

Instantly share code, notes, and snippets.

@josephwilk
Created June 11, 2012 16:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save josephwilk/2911006 to your computer and use it in GitHub Desktop.
Save josephwilk/2911006 to your computer and use it in GitHub Desktop.
Wrapper around elastic-mapreduce to make it easier to use
#!/usr/bin/env python
EMR_COMMAND = os.path.expanduser('~/elastic-mapreduce/elastic-mapreduce')
EMR_LOGGING_DIR = "s3://songkick/emr-logs"
def create_pig_job_flow(pigscript, num_instances=1, extraArguments=[]):
jobname = "Pig_Daily_" + datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
print "Creating pig job flow", jobname, pigscript
args = [EMR_COMMAND,
"--create",
"--name", jobname,
"--num-instances", str(num_instances),
"--plain-output",
"--log-uri", "%s/%s" % (EMR_LOGGING_DIR, jobname),
"--pig-script", str(pigscript)]
for extraArgument in extraArguments:
args.append("--arg")
args.append(str(extraArgument))
jobid = subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0].strip()
print "Job Id:", jobid
return jobid
def get_job_description(jobid):
output = subprocess.Popen([EMR_COMMAND, "--describe", "-j", jobid], stdout=subprocess.PIPE).communicate()[0]
job_description = json.loads(output)['JobFlows'][0]
return job_description
def wait_for_job_completion(jobid):
running = True
state = None
while running:
job_description = get_job_description(jobid)
status_detail = job_description['ExecutionStatusDetail']
if state != status_detail['State']:
state = status_detail['State']
print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "state:", state
if status_detail['EndDateTime'] is not None:
running = False
else:
time.sleep(30)
def check_step_status(jobid):
job_description = get_job_description(jobid)
print
print 'Steps:'
failed = False
for step in job_description['Steps']:
step_name = step['StepConfig']['Name']
step_state = step['ExecutionStatusDetail']['State']
if step_state == 'FAILED' or step_state == 'CANCELLED':
failed = True
print '-',step_name,'\t',step_state
if failed:
sys.exit(1)
if __name__ == '__main__':
pigscript = sys.argv[1]
if len(sys.argv) > 2:
args = sys.argv[2:]
else:
args = []
jobid = create_pig_job_flow(pigscript, extraArguments=args)
wait_for_job_completion(jobid)
check_step_status(jobid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment