Last active
February 16, 2018 23:01
-
-
Save dovy/60dcef0085a7b6a8e759aae0ede84d16 to your computer and use it in GitHub Desktop.
Utility to make it easy to monitor a single cluster or a prefix name for multiple clusters using gcs dataproc. Be sure to run `pip install beautifultable` before running.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import json | |
import argparse | |
from beautifultable import BeautifulTable | |
import commands | |
import time | |
import os | |
def print_jobs(cluster_name): | |
cmd = "gcloud dataproc jobs list --project {project} --cluster {cluster} {limit} --format json".format( | |
project=args["project"], | |
cluster=cluster_name, | |
limit="--limit " + args['limit'] if "limit" in args and args['limit'] else "" | |
) | |
resp = commands.getstatusoutput(cmd)[1] | |
jobdata = json.loads(resp) | |
if args['run_clear']: | |
os.system('clear') | |
args['run_clear'] = False | |
table = BeautifulTable(max_width=130) | |
table.column_headers = ["jobname", "status", "starttime", "jobId"] | |
first_done = False | |
stats = {} | |
for job in jobdata: | |
setuptime = [s["stateStartTime"] for s in job["statusHistory"] if s["state"] == "SETUP_DONE"][0] | |
# if job["status"]["state"] != "ERROR": | |
try: | |
if job["status"]["state"] not in stats: | |
stats[job["status"]["state"]] = 0 | |
stats[job["status"]["state"]] += 1 | |
if job["status"]["state"] == "DONE" and args['hide_done']: | |
if not first_done: | |
first_done = True | |
stats['LAST_COMPLETED'] = setuptime | |
continue | |
table.append_row([job["yarnApplications"][0]["name"] if "yarnApplications" in job and "name" in | |
job["yarnApplications"][0] else "", | |
job["status"]["state"], | |
setuptime, | |
job["reference"]["jobId"]]) | |
except Exception as e: | |
print(e) | |
print('HERE!') | |
print(job) | |
table.row_seperator_char = "" | |
table.set_style(BeautifulTable.STYLE_COMPACT) | |
print('\n\n---> {}:\n'.format(cluster_name)) | |
print(table) | |
joblist = list(table) | |
stats_string = "" | |
for type, value in stats.iteritems(): | |
stats_string += "\t{}: {}".format(type, value) | |
print(stats_string) | |
if "ERROR" in stats: | |
errlist = [j for j in joblist if j[1] == "DONE"] | |
print("\tJobs that failed: {}".format(len(errlist))) | |
errlist = [j for j in joblist if j[1] == "ERROR"] | |
for j in errlist: | |
print("\t\t" + j[0][5:13]) | |
def start_here(): | |
args['run_clear'] = True | |
cmd = "gcloud dataproc clusters list --project {} --format json".format(args["project"]) | |
resp = commands.getstatusoutput(cmd)[1] | |
resp = json.loads(resp) | |
if "*" in args['cluster']: | |
args['cluster'] = args['cluster'].split('*')[0] | |
args['clusters'] = {} | |
for item in resp: | |
if args['cluster'] not in item['clusterName']: | |
continue | |
args['clusters'][item['clusterName']] = item | |
for item in sorted(args['clusters'].keys()): | |
print_jobs(cluster_name=item) | |
if args['loop_repeat']: | |
time.sleep(30) | |
return start_here() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-p", "--project", default="nyt-pax-dev") | |
parser.add_argument("-c", "--cluster", default="none") | |
parser.add_argument("-l", "--limit", default="1000") | |
parser.add_argument("-r", "--loop_repeat", default=True) | |
parser.add_argument("-hd", "--hide_done", default=True) | |
args = vars(parser.parse_args()) | |
start_here() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment