Skip to content

Instantly share code, notes, and snippets.

@SamStudio8
Last active November 28, 2023 16:12
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save SamStudio8/7f2edcfda17906e3941b to your computer and use it in GitHub Desktop.
Save SamStudio8/7f2edcfda17906e3941b to your computer and use it in GitHub Desktop.
SGE QACCT Parsing
"""
Provides functions for the purpose of parsing a Sun Grid Engine (SGE)
accounting file for job metadata.
"""
__author__ = "Sam Nicholls <msn@aber.ac.uk>"
__copyright__ = "Copyright (c) Sam Nicholls"
__version__ = "0.0.32"
__maintainer__ = "Sam Nicholls <msn@aber.ac.uk>"
import sys
from datetime import datetime
class Account(object):
def __init__(self, acct_path, parse_extra=None):
self.fh = open(acct_path)
self.jobs = {}
self.parse_extra = parse_extra
self.fields_encountered = []
self.parse()
def parse(self):
for line in self.fh:
if line[0] == "#":
# Skip comments
continue
j = self.parse_job(line)
if j is not None:
ja = self.annotate_job(j)
j.update(ja)
self.jobs[j["jobnumber"]] = j
self.jobs[j["jobnumber"]]["extra"] = {}
if self.parse_extra:
self.jobs[j["jobnumber"]]["extra"] = self.parse_extra(j, ja)
self.fh.close()
def parse_job(self, line):
"""
Given a valid line from an SGE accounting file, return a dictionary
of each of the fields as a key. Values are parsed as described in
http://manpages.ubuntu.com/manpages/lucid/man5/sge_accounting.5.html
"""
fields = line.strip().split(":")
try:
return {
"qname": fields[0],
"hostname": fields[1],
"group": fields[2],
"owner": fields[3],
"jobname": fields[4],
"jobnumber": int(fields[5]),
"account": fields[6],
"priority": float(fields[7]),
"qsub_time": int(fields[8]),
"start_time": int(fields[9]),
"end_time": int(fields[10]),
"failed": int(fields[11]),
"exit_status": int(fields[12]),
"ru_wallclock": float(fields[13]),
"ru_utime": float(fields[14]),
"ru_stime": float(fields[15]),
"ru_maxrss": float(fields[16]),
"ru_ixrss": float(fields[17]),
"ru_ismrss": float(fields[18]),
"ru_idrss": float(fields[19]),
"ru_isrss": float(fields[20]),
"ru_minflt": float(fields[21]),
"ru_majflt": float(fields[22]),
"ru_nswap": float(fields[23]),
"ru_inblock": float(fields[24]),
"ru_oublock": float(fields[25]),
"ru_msgsnd": float(fields[26]),
"ru_msgrcv": float(fields[27]),
"ru_nsignals": float(fields[28]),
"ru_nvcsw": float(fields[29]),
"ru_nivcsw": float(fields[30]),
"project": fields[31],
"department": fields[32],
"granted_pe": fields[33],
"slots": int(fields[34]),
"taskid": int(fields[35]),
"cpu": float(fields[36]),
"mem": float(fields[37]),
"io": float(fields[38]),
"category": self.parse_category(fields[39]),
"iow": float(fields[40]),
"pe_taskid": int(fields[41]) if fields[41] != "NONE" else None,
"maxvmem": float(fields[42]),
"arid": fields[43],
"ar_submission_time": fields[44]
}
except IndexError:
sys.stderr.write("[WARN] Seemingly invalid job line encountered. Skipping.\n")
return None
@staticmethod
def parse_category(category_str):
"""
Parse the queue 'category' field as found in a job line, typically
containing additional options specified to the queue on submission,
including runtime and memory resource requests.
"""
# NOTE I'm not overly happy with the parsing here but wanted to avoid
# importing the re library and doing a bunch of regular expressions.
fields = category_str.split("-")
req_usergroup = "default"
req_l = {"h_vmem": None, "h_rt": None, "h_stack": None}
req_queues = []
for field in fields:
if len(field) == 0:
# First field will always be empty as line (should) begin with delimiter
continue
if field[0:2] == "pe":
# Expected format: "-pe multithread <int>"
# NOTE Must appear before a check on field[0] == 'p'
# NOTE key:value not stored as job_dict['slots'] already has the same data
# but block here suppresses "unknown string data" error in else.
pass
elif field[0] == "U":
# Expected format: "-U <str>"
req_usergroup = field.split(" ")[1]
elif field[0] == "l":
# Expected format: "-l h_vmem=<float>[Mm|Gg],h_stack=<float>[Mm|Gg],h_rt=<int>"
for sub_field in field[1:].strip().split(","):
key, value = sub_field.strip().split("=")
if key.lower() == "h_stack" or key.lower() == "h_vmem":
# Convert value to MB
if value[-1].upper() == "G":
#TODO Multiply by 1000 or 1024?
value = float(value[:-1]) * 1024
elif value[-1].upper() == "M":
value = float(value[:-1])
elif value[-1].isdigit() is True:
# Convert bytes to MB
value = float(value) / 1000000
else:
sys.stderr.write("[WARN] Unknown unit of memory encountered parsing -l subfield in queue category field: %s\n" % value[-1].upper())
sys.stderr.write(" %s\n" % field.strip())
elif key.lower() == "h_rt":
value = int(value)
else:
sys.stderr.write("[WARN] Unknown subfield encountered parsing queue category field: %s\n" % key)
sys.stderr.write(" %s\n" % field.strip())
# Insert the new key:value pair to the req_l dict
req_l[key] = value
elif field[0] == "q":
# Expected format: "-q <str1>,<str2>,...,<strN>"
req_queues = [f.strip() for f in field[1:].split(",")[0:]]
else:
sys.stderr.write("[WARN] Unknown queue category string field: %s\n" % field)
return {
"req_usergroup": req_usergroup,
"req_l": req_l,
"req_queues": req_queues,
}
@staticmethod
def annotate_job(job_dict):
"""
Given a job dict as created by `parse_job`, return a dict of potentially
useful additional metadata.
mem
mem_req gibibytes of memory requested per slot
mem_req_tot gibibytes of memory requested across all slots
mem_used maximum memory used in gibibytes
mem_diff mem_req_tot - mem_used
mem_pct percentage of memory requested in respect to amount needed
time
qsub_dt qsub_time as datetime
start_dt start_time as datetime
end_dt end_time as datetime
time_taken end_time - start_time (as unix timestamp)
time_taken_td time_taken as datetime timedelta
"""
mem_req = (job_dict["category"]["req_l"]["h_vmem"]/1024)
mem_req_tot = mem_req * job_dict["slots"]
mem_used = (job_dict["maxvmem"]/1000000000)*0.931323
mem_diff = mem_req_tot - mem_used
try:
mem_pct = (mem_req_tot / mem_used) * 100
except ZeroDivisionError:
mem_pct = 0
start_dt = datetime.fromtimestamp(job_dict["start_time"])
end_dt = datetime.fromtimestamp(job_dict["end_time"])
time_taken_td = end_dt - start_dt
time_taken = job_dict["end_time"] - job_dict["start_time"]
return {
"mem": {
"mem_req": mem_req,
"mem_req_tot": mem_req_tot,
"mem_used": mem_used,
"mem_diff": mem_diff,
"mem_pct": mem_pct
},
"time": {
"qsub_dt": datetime.fromtimestamp(job_dict["qsub_time"]),
"start_dt": datetime.fromtimestamp(job_dict["start_time"]),
"end_dt": datetime.fromtimestamp(job_dict["end_time"]),
"time_taken": time_taken,
"time_taken_td": time_taken_td
}
}
def print_job(self, jid, fmt_str):
fields = fmt_str.split(":")
job = self.jobs[jid]
out_str = []
for field in fields:
if "." in field:
field, subfield = field.split(".")
value = job.get(field, {}).get(subfield, None)
else:
value = job.get(field, None)
if value is not None:
#TODO Will ignore fields which genuinely are allowed to be None
if type(value) is float:
out_str.append("{0:.2f}".format(value))
else:
out_str.append(str(value))
else:
if field not in self.fields_encountered:
self.fields_encountered.append(field)
sys.stderr.write("[WARN] Field '%s' not found in job object.\n" % field)
return "\t".join(out_str)
import argparse
import sys
from acct_parse import Account
# Example
# python ~/scripts/acct_print.py "jobnumber:exit_status:mem.mem_req_tot:mem.mem_used:time.time_taken_td" --start 1235367 --end 1236460 [--failed]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("fmt_string", default="jobnumber:hostname:jobname:exit_status")
parser.add_argument("--acct_path", default="/cm/shared/apps/sge/6.2u5p2/default/common/accounting")
parser.add_argument("--start", type=int)
parser.add_argument("--end", type=int)
parser.add_argument("-f", "--failed", action="store_true")
args = parser.parse_args()
if args.start and args.end:
if args.start > args.end:
print("[FAIL] --start larger than --end, no jobs will be matched.")
sys.exit(1)
acct = Account(args.acct_path)
for jid, job in sorted(acct.jobs.items()):
jid = int(jid)
if args.start:
if jid < args.start:
continue
if args.end:
if jid > args.end:
continue
if args.failed:
if job["exit_status"] == 0:
continue
print acct.print_job(jid, args.fmt_string)
from acct_parse import Account
if __name__ == "__main__":
acct_path = "/cm/shared/apps/sge/6.2u5p2/default/common/accounting"
def parse_extra(j, ja):
hours_taken = ja["time"]["time_taken_td"].total_seconds() / (60*60)
ram_hours = ja["mem"]["mem_req"] * hours_taken
wasted_ram_hours = ja["mem"]["mem_diff"] * hours_taken
return {
"jobnumber": j["jobnumber"],
"hours_taken": hours_taken,
"ram_hours": ram_hours,
"wasted_ram_hours": wasted_ram_hours
}
acct = Account(acct_path, parse_extra=parse_extra)
print "jid\tnode\tname\tuser\tgbmem_req\tgbmem_used\tdelta_gbmem\tpct_mem\ttime\tgigaram_hours\twasted_gigaram_hours\texit"
for job in sorted(acct.jobs.items(), key=lambda x:x[1]["extra"]['wasted_ram_hours']):
jid = job[0]
print acct.print_job(jid, "jobnumber:hostname:jobname:owner:mem.mem_req_tot:mem.mem_used:mem.mem_diff:mem.mem_pct:time.time_taken_td:extra.ram_hours:extra.wasted_ram_hours:exit_status")
@cemdorst
Copy link

Just want to say thanks. It worked beautifully!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment