Last active
November 28, 2023 16:12
-
-
Save SamStudio8/7f2edcfda17906e3941b to your computer and use it in GitHub Desktop.
SGE QACCT Parsing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Provides functions for the purpose of parsing a Sun Grid Engine (SGE) | |
accounting file for job metadata. | |
""" | |
__author__ = "Sam Nicholls <msn@aber.ac.uk>" | |
__copyright__ = "Copyright (c) Sam Nicholls" | |
__version__ = "0.0.32" | |
__maintainer__ = "Sam Nicholls <msn@aber.ac.uk>" | |
import sys | |
from datetime import datetime | |
class Account(object): | |
def __init__(self, acct_path, parse_extra=None): | |
self.fh = open(acct_path) | |
self.jobs = {} | |
self.parse_extra = parse_extra | |
self.fields_encountered = [] | |
self.parse() | |
def parse(self): | |
for line in self.fh: | |
if line[0] == "#": | |
# Skip comments | |
continue | |
j = self.parse_job(line) | |
if j is not None: | |
ja = self.annotate_job(j) | |
j.update(ja) | |
self.jobs[j["jobnumber"]] = j | |
self.jobs[j["jobnumber"]]["extra"] = {} | |
if self.parse_extra: | |
self.jobs[j["jobnumber"]]["extra"] = self.parse_extra(j, ja) | |
self.fh.close() | |
def parse_job(self, line): | |
""" | |
Given a valid line from an SGE accounting file, return a dictionary | |
of each of the fields as a key. Values are parsed as described in | |
http://manpages.ubuntu.com/manpages/lucid/man5/sge_accounting.5.html | |
""" | |
fields = line.strip().split(":") | |
try: | |
return { | |
"qname": fields[0], | |
"hostname": fields[1], | |
"group": fields[2], | |
"owner": fields[3], | |
"jobname": fields[4], | |
"jobnumber": int(fields[5]), | |
"account": fields[6], | |
"priority": float(fields[7]), | |
"qsub_time": int(fields[8]), | |
"start_time": int(fields[9]), | |
"end_time": int(fields[10]), | |
"failed": int(fields[11]), | |
"exit_status": int(fields[12]), | |
"ru_wallclock": float(fields[13]), | |
"ru_utime": float(fields[14]), | |
"ru_stime": float(fields[15]), | |
"ru_maxrss": float(fields[16]), | |
"ru_ixrss": float(fields[17]), | |
"ru_ismrss": float(fields[18]), | |
"ru_idrss": float(fields[19]), | |
"ru_isrss": float(fields[20]), | |
"ru_minflt": float(fields[21]), | |
"ru_majflt": float(fields[22]), | |
"ru_nswap": float(fields[23]), | |
"ru_inblock": float(fields[24]), | |
"ru_oublock": float(fields[25]), | |
"ru_msgsnd": float(fields[26]), | |
"ru_msgrcv": float(fields[27]), | |
"ru_nsignals": float(fields[28]), | |
"ru_nvcsw": float(fields[29]), | |
"ru_nivcsw": float(fields[30]), | |
"project": fields[31], | |
"department": fields[32], | |
"granted_pe": fields[33], | |
"slots": int(fields[34]), | |
"taskid": int(fields[35]), | |
"cpu": float(fields[36]), | |
"mem": float(fields[37]), | |
"io": float(fields[38]), | |
"category": self.parse_category(fields[39]), | |
"iow": float(fields[40]), | |
"pe_taskid": int(fields[41]) if fields[41] != "NONE" else None, | |
"maxvmem": float(fields[42]), | |
"arid": fields[43], | |
"ar_submission_time": fields[44] | |
} | |
except IndexError: | |
sys.stderr.write("[WARN] Seemingly invalid job line encountered. Skipping.\n") | |
return None | |
@staticmethod | |
def parse_category(category_str): | |
""" | |
Parse the queue 'category' field as found in a job line, typically | |
containing additional options specified to the queue on submission, | |
including runtime and memory resource requests. | |
""" | |
# NOTE I'm not overly happy with the parsing here but wanted to avoid | |
# importing the re library and doing a bunch of regular expressions. | |
fields = category_str.split("-") | |
req_usergroup = "default" | |
req_l = {"h_vmem": None, "h_rt": None, "h_stack": None} | |
req_queues = [] | |
for field in fields: | |
if len(field) == 0: | |
# First field will always be empty as line (should) begin with delimiter | |
continue | |
if field[0:2] == "pe": | |
# Expected format: "-pe multithread <int>" | |
# NOTE Must appear before a check on field[0] == 'p' | |
# NOTE key:value not stored as job_dict['slots'] already has the same data | |
# but block here suppresses "unknown string data" error in else. | |
pass | |
elif field[0] == "U": | |
# Expected format: "-U <str>" | |
req_usergroup = field.split(" ")[1] | |
elif field[0] == "l": | |
# Expected format: "-l h_vmem=<float>[Mm|Gg],h_stack=<float>[Mm|Gg],h_rt=<int>" | |
for sub_field in field[1:].strip().split(","): | |
key, value = sub_field.strip().split("=") | |
if key.lower() == "h_stack" or key.lower() == "h_vmem": | |
# Convert value to MB | |
if value[-1].upper() == "G": | |
#TODO Multiply by 1000 or 1024? | |
value = float(value[:-1]) * 1024 | |
elif value[-1].upper() == "M": | |
value = float(value[:-1]) | |
elif value[-1].isdigit() is True: | |
# Convert bytes to MB | |
value = float(value) / 1000000 | |
else: | |
sys.stderr.write("[WARN] Unknown unit of memory encountered parsing -l subfield in queue category field: %s\n" % value[-1].upper()) | |
sys.stderr.write(" %s\n" % field.strip()) | |
elif key.lower() == "h_rt": | |
value = int(value) | |
else: | |
sys.stderr.write("[WARN] Unknown subfield encountered parsing queue category field: %s\n" % key) | |
sys.stderr.write(" %s\n" % field.strip()) | |
# Insert the new key:value pair to the req_l dict | |
req_l[key] = value | |
elif field[0] == "q": | |
# Expected format: "-q <str1>,<str2>,...,<strN>" | |
req_queues = [f.strip() for f in field[1:].split(",")[0:]] | |
else: | |
sys.stderr.write("[WARN] Unknown queue category string field: %s\n" % field) | |
return { | |
"req_usergroup": req_usergroup, | |
"req_l": req_l, | |
"req_queues": req_queues, | |
} | |
@staticmethod | |
def annotate_job(job_dict): | |
""" | |
Given a job dict as created by `parse_job`, return a dict of potentially | |
useful additional metadata. | |
mem | |
mem_req gibibytes of memory requested per slot | |
mem_req_tot gibibytes of memory requested across all slots | |
mem_used maximum memory used in gibibytes | |
mem_diff mem_req_tot - mem_used | |
mem_pct percentage of memory requested in respect to amount needed | |
time | |
qsub_dt qsub_time as datetime | |
start_dt start_time as datetime | |
end_dt end_time as datetime | |
time_taken end_time - start_time (as unix timestamp) | |
time_taken_td time_taken as datetime timedelta | |
""" | |
mem_req = (job_dict["category"]["req_l"]["h_vmem"]/1024) | |
mem_req_tot = mem_req * job_dict["slots"] | |
mem_used = (job_dict["maxvmem"]/1000000000)*0.931323 | |
mem_diff = mem_req_tot - mem_used | |
try: | |
mem_pct = (mem_req_tot / mem_used) * 100 | |
except ZeroDivisionError: | |
mem_pct = 0 | |
start_dt = datetime.fromtimestamp(job_dict["start_time"]) | |
end_dt = datetime.fromtimestamp(job_dict["end_time"]) | |
time_taken_td = end_dt - start_dt | |
time_taken = job_dict["end_time"] - job_dict["start_time"] | |
return { | |
"mem": { | |
"mem_req": mem_req, | |
"mem_req_tot": mem_req_tot, | |
"mem_used": mem_used, | |
"mem_diff": mem_diff, | |
"mem_pct": mem_pct | |
}, | |
"time": { | |
"qsub_dt": datetime.fromtimestamp(job_dict["qsub_time"]), | |
"start_dt": datetime.fromtimestamp(job_dict["start_time"]), | |
"end_dt": datetime.fromtimestamp(job_dict["end_time"]), | |
"time_taken": time_taken, | |
"time_taken_td": time_taken_td | |
} | |
} | |
def print_job(self, jid, fmt_str): | |
fields = fmt_str.split(":") | |
job = self.jobs[jid] | |
out_str = [] | |
for field in fields: | |
if "." in field: | |
field, subfield = field.split(".") | |
value = job.get(field, {}).get(subfield, None) | |
else: | |
value = job.get(field, None) | |
if value is not None: | |
#TODO Will ignore fields which genuinely are allowed to be None | |
if type(value) is float: | |
out_str.append("{0:.2f}".format(value)) | |
else: | |
out_str.append(str(value)) | |
else: | |
if field not in self.fields_encountered: | |
self.fields_encountered.append(field) | |
sys.stderr.write("[WARN] Field '%s' not found in job object.\n" % field) | |
return "\t".join(out_str) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import sys | |
from acct_parse import Account | |
# Example | |
# python ~/scripts/acct_print.py "jobnumber:exit_status:mem.mem_req_tot:mem.mem_used:time.time_taken_td" --start 1235367 --end 1236460 [--failed] | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("fmt_string", default="jobnumber:hostname:jobname:exit_status") | |
parser.add_argument("--acct_path", default="/cm/shared/apps/sge/6.2u5p2/default/common/accounting") | |
parser.add_argument("--start", type=int) | |
parser.add_argument("--end", type=int) | |
parser.add_argument("-f", "--failed", action="store_true") | |
args = parser.parse_args() | |
if args.start and args.end: | |
if args.start > args.end: | |
print("[FAIL] --start larger than --end, no jobs will be matched.") | |
sys.exit(1) | |
acct = Account(args.acct_path) | |
for jid, job in sorted(acct.jobs.items()): | |
jid = int(jid) | |
if args.start: | |
if jid < args.start: | |
continue | |
if args.end: | |
if jid > args.end: | |
continue | |
if args.failed: | |
if job["exit_status"] == 0: | |
continue | |
print acct.print_job(jid, args.fmt_string) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from acct_parse import Account | |
if __name__ == "__main__": | |
acct_path = "/cm/shared/apps/sge/6.2u5p2/default/common/accounting" | |
def parse_extra(j, ja): | |
hours_taken = ja["time"]["time_taken_td"].total_seconds() / (60*60) | |
ram_hours = ja["mem"]["mem_req"] * hours_taken | |
wasted_ram_hours = ja["mem"]["mem_diff"] * hours_taken | |
return { | |
"jobnumber": j["jobnumber"], | |
"hours_taken": hours_taken, | |
"ram_hours": ram_hours, | |
"wasted_ram_hours": wasted_ram_hours | |
} | |
acct = Account(acct_path, parse_extra=parse_extra) | |
print "jid\tnode\tname\tuser\tgbmem_req\tgbmem_used\tdelta_gbmem\tpct_mem\ttime\tgigaram_hours\twasted_gigaram_hours\texit" | |
for job in sorted(acct.jobs.items(), key=lambda x:x[1]["extra"]['wasted_ram_hours']): | |
jid = job[0] | |
print acct.print_job(jid, "jobnumber:hostname:jobname:owner:mem.mem_req_tot:mem.mem_used:mem.mem_diff:mem.mem_pct:time.time_taken_td:extra.ram_hours:extra.wasted_ram_hours:exit_status") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just want to say thanks. It worked beautifully!!