Skip to content

Instantly share code, notes, and snippets.

@dovy
Last active February 16, 2018 23:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dovy/60dcef0085a7b6a8e759aae0ede84d16 to your computer and use it in GitHub Desktop.
Save dovy/60dcef0085a7b6a8e759aae0ede84d16 to your computer and use it in GitHub Desktop.
Utility to make it easy to monitor a single cluster or a prefix name for multiple clusters using gcs dataproc. Be sure to run `pip install beautifultable` before running.
#!/usr/bin/env python
import json
import argparse
from beautifultable import BeautifulTable
import commands
import time
import os
def print_jobs(cluster_name):
cmd = "gcloud dataproc jobs list --project {project} --cluster {cluster} {limit} --format json".format(
project=args["project"],
cluster=cluster_name,
limit="--limit " + args['limit'] if "limit" in args and args['limit'] else ""
)
resp = commands.getstatusoutput(cmd)[1]
jobdata = json.loads(resp)
if args['run_clear']:
os.system('clear')
args['run_clear'] = False
table = BeautifulTable(max_width=130)
table.column_headers = ["jobname", "status", "starttime", "jobId"]
first_done = False
stats = {}
for job in jobdata:
setuptime = [s["stateStartTime"] for s in job["statusHistory"] if s["state"] == "SETUP_DONE"][0]
# if job["status"]["state"] != "ERROR":
try:
if job["status"]["state"] not in stats:
stats[job["status"]["state"]] = 0
stats[job["status"]["state"]] += 1
if job["status"]["state"] == "DONE" and args['hide_done']:
if not first_done:
first_done = True
stats['LAST_COMPLETED'] = setuptime
continue
table.append_row([job["yarnApplications"][0]["name"] if "yarnApplications" in job and "name" in
job["yarnApplications"][0] else "",
job["status"]["state"],
setuptime,
job["reference"]["jobId"]])
except Exception as e:
print(e)
print('HERE!')
print(job)
table.row_seperator_char = ""
table.set_style(BeautifulTable.STYLE_COMPACT)
print('\n\n---> {}:\n'.format(cluster_name))
print(table)
joblist = list(table)
stats_string = ""
for type, value in stats.iteritems():
stats_string += "\t{}: {}".format(type, value)
print(stats_string)
if "ERROR" in stats:
errlist = [j for j in joblist if j[1] == "DONE"]
print("\tJobs that failed: {}".format(len(errlist)))
errlist = [j for j in joblist if j[1] == "ERROR"]
for j in errlist:
print("\t\t" + j[0][5:13])
def start_here():
args['run_clear'] = True
cmd = "gcloud dataproc clusters list --project {} --format json".format(args["project"])
resp = commands.getstatusoutput(cmd)[1]
resp = json.loads(resp)
if "*" in args['cluster']:
args['cluster'] = args['cluster'].split('*')[0]
args['clusters'] = {}
for item in resp:
if args['cluster'] not in item['clusterName']:
continue
args['clusters'][item['clusterName']] = item
for item in sorted(args['clusters'].keys()):
print_jobs(cluster_name=item)
if args['loop_repeat']:
time.sleep(30)
return start_here()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--project", default="nyt-pax-dev")
parser.add_argument("-c", "--cluster", default="none")
parser.add_argument("-l", "--limit", default="1000")
parser.add_argument("-r", "--loop_repeat", default=True)
parser.add_argument("-hd", "--hide_done", default=True)
args = vars(parser.parse_args())
start_here()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment