Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
SGE QACCT Parsing
"""
Provides functions for the purpose of parsing a Sun Grid Engine (SGE)
accounting file for job metadata.
"""
__author__ = "Sam Nicholls <msn@aber.ac.uk>"
__copyright__ = "Copyright (c) Sam Nicholls"
__version__ = "0.0.32"
__maintainer__ = "Sam Nicholls <msn@aber.ac.uk>"
import sys
from datetime import datetime
class Account(object):
def __init__(self, acct_path, parse_extra=None):
self.fh = open(acct_path)
self.jobs = {}
self.parse_extra = parse_extra
self.fields_encountered = []
self.parse()
def parse(self):
for line in self.fh:
if line[0] == "#":
# Skip comments
continue
j = self.parse_job(line)
if j is not None:
ja = self.annotate_job(j)
j.update(ja)
self.jobs[j["jobnumber"]] = j
self.jobs[j["jobnumber"]]["extra"] = {}
if self.parse_extra:
self.jobs[j["jobnumber"]]["extra"] = self.parse_extra(j, ja)
self.fh.close()
def parse_job(self, line):
"""
Given a valid line from an SGE accounting file, return a dictionary
of each of the fields as a key. Values are parsed as described in
http://manpages.ubuntu.com/manpages/lucid/man5/sge_accounting.5.html
"""
fields = line.strip().split(":")
try:
return {
"qname": fields[0],
"hostname": fields[1],
"group": fields[2],
"owner": fields[3],
"jobname": fields[4],
"jobnumber": int(fields[5]),
"account": fields[6],
"priority": float(fields[7]),
"qsub_time": int(fields[8]),
"start_time": int(fields[9]),
"end_time": int(fields[10]),
"failed": int(fields[11]),
"exit_status": int(fields[12]),
"ru_wallclock": float(fields[13]),
"ru_utime": float(fields[14]),
"ru_stime": float(fields[15]),
"ru_maxrss": float(fields[16]),
"ru_ixrss": float(fields[17]),
"ru_ismrss": float(fields[18]),
"ru_idrss": float(fields[19]),
"ru_isrss": float(fields[20]),
"ru_minflt": float(fields[21]),
"ru_majflt": float(fields[22]),
"ru_nswap": float(fields[23]),
"ru_inblock": float(fields[24]),
"ru_oublock": float(fields[25]),
"ru_msgsnd": float(fields[26]),
"ru_msgrcv": float(fields[27]),
"ru_nsignals": float(fields[28]),
"ru_nvcsw": float(fields[29]),
"ru_nivcsw": float(fields[30]),
"project": fields[31],
"department": fields[32],
"granted_pe": fields[33],
"slots": int(fields[34]),
"taskid": int(fields[35]),
"cpu": float(fields[36]),
"mem": float(fields[37]),
"io": float(fields[38]),
"category": self.parse_category(fields[39]),
"iow": float(fields[40]),
"pe_taskid": int(fields[41]) if fields[41] != "NONE" else None,
"maxvmem": float(fields[42]),
"arid": fields[43],
"ar_submission_time": fields[44]
}
except IndexError:
sys.stderr.write("[WARN] Seemingly invalid job line encountered. Skipping.\n")
return None
@staticmethod
def parse_category(category_str):
"""
Parse the queue 'category' field as found in a job line, typically
containing additional options specified to the queue on submission,
including runtime and memory resource requests.
"""
# NOTE I'm not overly happy with the parsing here but wanted to avoid
# importing the re library and doing a bunch of regular expressions.
fields = category_str.split("-")
req_usergroup = "default"
req_l = {"h_vmem": None, "h_rt": None, "h_stack": None}
req_queues = []
for field in fields:
if len(field) == 0:
# First field will always be empty as line (should) begin with delimiter
continue
if field[0:2] == "pe":
# Expected format: "-pe multithread <int>"
# NOTE Must appear before a check on field[0] == 'p'
# NOTE key:value not stored as job_dict['slots'] already has the same data
# but block here suppresses "unknown string data" error in else.
pass
elif field[0] == "U":
# Expected format: "-U <str>"
req_usergroup = field.split(" ")[1]
elif field[0] == "l":
# Expected format: "-l h_vmem=<float>[Mm|Gg],h_stack=<float>[Mm|Gg],h_rt=<int>"
for sub_field in field[1:].strip().split(","):
key, value = sub_field.strip().split("=")
if key.lower() == "h_stack" or key.lower() == "h_vmem":
# Convert value to MB
if value[-1].upper() == "G":
#TODO Multiply by 1000 or 1024?
value = float(value[:-1]) * 1024
elif value[-1].upper() == "M":
value = float(value[:-1])
elif value[-1].isdigit() is True:
# Convert bytes to MB
value = float(value) / 1000000
else:
sys.stderr.write("[WARN] Unknown unit of memory encountered parsing -l subfield in queue category field: %s\n" % value[-1].upper())
sys.stderr.write(" %s\n" % field.strip())
elif key.lower() == "h_rt":
value = int(value)
else:
sys.stderr.write("[WARN] Unknown subfield encountered parsing queue category field: %s\n" % key)
sys.stderr.write(" %s\n" % field.strip())
# Insert the new key:value pair to the req_l dict
req_l[key] = value
elif field[0] == "q":
# Expected format: "-q <str1>,<str2>,...,<strN>"
req_queues = [f.strip() for f in field[1:].split(",")[0:]]
else:
sys.stderr.write("[WARN] Unknown queue category string field: %s\n" % field)
return {
"req_usergroup": req_usergroup,
"req_l": req_l,
"req_queues": req_queues,
}
@staticmethod
def annotate_job(job_dict):
"""
Given a job dict as created by `parse_job`, return a dict of potentially
useful additional metadata.
mem
mem_req gibibytes of memory requested per slot
mem_req_tot gibibytes of memory requested across all slots
mem_used maximum memory used in gibibytes
mem_diff mem_req_tot - mem_used
mem_pct percentage of memory requested in respect to amount needed
time
qsub_dt qsub_time as datetime
start_dt start_time as datetime
end_dt end_time as datetime
time_taken end_time - start_time (as unix timestamp)
time_taken_td time_taken as datetime timedelta
"""
mem_req = (job_dict["category"]["req_l"]["h_vmem"]/1024)
mem_req_tot = mem_req * job_dict["slots"]
mem_used = (job_dict["maxvmem"]/1000000000)*0.931323
mem_diff = mem_req_tot - mem_used
try:
mem_pct = (mem_req_tot / mem_used) * 100
except ZeroDivisionError:
mem_pct = 0
start_dt = datetime.fromtimestamp(job_dict["start_time"])
end_dt = datetime.fromtimestamp(job_dict["end_time"])
time_taken_td = end_dt - start_dt
time_taken = job_dict["end_time"] - job_dict["start_time"]
return {
"mem": {
"mem_req": mem_req,
"mem_req_tot": mem_req_tot,
"mem_used": mem_used,
"mem_diff": mem_diff,
"mem_pct": mem_pct
},
"time": {
"qsub_dt": datetime.fromtimestamp(job_dict["qsub_time"]),
"start_dt": datetime.fromtimestamp(job_dict["start_time"]),
"end_dt": datetime.fromtimestamp(job_dict["end_time"]),
"time_taken": time_taken,
"time_taken_td": time_taken_td
}
}
def print_job(self, jid, fmt_str):
fields = fmt_str.split(":")
job = self.jobs[jid]
out_str = []
for field in fields:
if "." in field:
field, subfield = field.split(".")
value = job.get(field, {}).get(subfield, None)
else:
value = job.get(field, None)
if value is not None:
#TODO Will ignore fields which genuinely are allowed to be None
if type(value) is float:
out_str.append("{0:.2f}".format(value))
else:
out_str.append(str(value))
else:
if field not in self.fields_encountered:
self.fields_encountered.append(field)
sys.stderr.write("[WARN] Field '%s' not found in job object.\n" % field)
return "\t".join(out_str)
import argparse
import sys
from acct_parse import Account
# Example
# python ~/scripts/acct_print.py "jobnumber:exit_status:mem.mem_req_tot:mem.mem_used:time.time_taken_td" --start 1235367 --end 1236460 [--failed]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("fmt_string", default="jobnumber:hostname:jobname:exit_status")
parser.add_argument("--acct_path", default="/cm/shared/apps/sge/6.2u5p2/default/common/accounting")
parser.add_argument("--start", type=int)
parser.add_argument("--end", type=int)
parser.add_argument("-f", "--failed", action="store_true")
args = parser.parse_args()
if args.start and args.end:
if args.start > args.end:
print("[FAIL] --start larger than --end, no jobs will be matched.")
sys.exit(1)
acct = Account(args.acct_path)
for jid, job in sorted(acct.jobs.items()):
jid = int(jid)
if args.start:
if jid < args.start:
continue
if args.end:
if jid > args.end:
continue
if args.failed:
if job["exit_status"] == 0:
continue
print acct.print_job(jid, args.fmt_string)
from acct_parse import Account
if __name__ == "__main__":
acct_path = "/cm/shared/apps/sge/6.2u5p2/default/common/accounting"
def parse_extra(j, ja):
hours_taken = ja["time"]["time_taken_td"].total_seconds() / (60*60)
ram_hours = ja["mem"]["mem_req"] * hours_taken
wasted_ram_hours = ja["mem"]["mem_diff"] * hours_taken
return {
"jobnumber": j["jobnumber"],
"hours_taken": hours_taken,
"ram_hours": ram_hours,
"wasted_ram_hours": wasted_ram_hours
}
acct = Account(acct_path, parse_extra=parse_extra)
print "jid\tnode\tname\tuser\tgbmem_req\tgbmem_used\tdelta_gbmem\tpct_mem\ttime\tgigaram_hours\twasted_gigaram_hours\texit"
for job in sorted(acct.jobs.items(), key=lambda x:x[1]["extra"]['wasted_ram_hours']):
jid = job[0]
print acct.print_job(jid, "jobnumber:hostname:jobname:owner:mem.mem_req_tot:mem.mem_used:mem.mem_diff:mem.mem_pct:time.time_taken_td:extra.ram_hours:extra.wasted_ram_hours:exit_status")
@techie879

This comment has been minimized.

Copy link

commented May 7, 2018

Hi,

This seems to be a very good solution on parsing SGE accounting file. Is there a guide on how-to use it? I tried the acct_parse.py script (i.e. python ./acct_parse.py /data/hpc/sge/xdmod/accounting_in_pieces/_acctae ) , but it did not return anything.

thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.