Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Provides functions for the purpose of parsing a Sun Grid Engine (SGE)
accounting file for job metadata.
__author__ = "Sam Nicholls <>"
__copyright__ = "Copyright (c) Sam Nicholls"
__version__ = "0.0.32"
__maintainer__ = "Sam Nicholls <>"
import sys
from datetime import datetime
class Account(object):
def __init__(self, acct_path, parse_extra=None):
self.fh = open(acct_path) = {}
self.parse_extra = parse_extra
self.fields_encountered = []
def parse(self):
for line in self.fh:
if line[0] == "#":
# Skip comments
j = self.parse_job(line)
if j is not None:
ja = self.annotate_job(j)
j.update(ja)[j["jobnumber"]] = j[j["jobnumber"]]["extra"] = {}
if self.parse_extra:[j["jobnumber"]]["extra"] = self.parse_extra(j, ja)
def parse_job(self, line):
Given a valid line from an SGE accounting file, return a dictionary
of each of the fields as a key. Values are parsed as described in
fields = line.strip().split(":")
return {
"qname": fields[0],
"hostname": fields[1],
"group": fields[2],
"owner": fields[3],
"jobname": fields[4],
"jobnumber": int(fields[5]),
"account": fields[6],
"priority": float(fields[7]),
"qsub_time": int(fields[8]),
"start_time": int(fields[9]),
"end_time": int(fields[10]),
"failed": int(fields[11]),
"exit_status": int(fields[12]),
"ru_wallclock": float(fields[13]),
"ru_utime": float(fields[14]),
"ru_stime": float(fields[15]),
"ru_maxrss": float(fields[16]),
"ru_ixrss": float(fields[17]),
"ru_ismrss": float(fields[18]),
"ru_idrss": float(fields[19]),
"ru_isrss": float(fields[20]),
"ru_minflt": float(fields[21]),
"ru_majflt": float(fields[22]),
"ru_nswap": float(fields[23]),
"ru_inblock": float(fields[24]),
"ru_oublock": float(fields[25]),
"ru_msgsnd": float(fields[26]),
"ru_msgrcv": float(fields[27]),
"ru_nsignals": float(fields[28]),
"ru_nvcsw": float(fields[29]),
"ru_nivcsw": float(fields[30]),
"project": fields[31],
"department": fields[32],
"granted_pe": fields[33],
"slots": int(fields[34]),
"taskid": int(fields[35]),
"cpu": float(fields[36]),
"mem": float(fields[37]),
"io": float(fields[38]),
"category": self.parse_category(fields[39]),
"iow": float(fields[40]),
"pe_taskid": int(fields[41]) if fields[41] != "NONE" else None,
"maxvmem": float(fields[42]),
"arid": fields[43],
"ar_submission_time": fields[44]
except IndexError:
sys.stderr.write("[WARN] Seemingly invalid job line encountered. Skipping.\n")
return None
def parse_category(category_str):
Parse the queue 'category' field as found in a job line, typically
containing additional options specified to the queue on submission,
including runtime and memory resource requests.
# NOTE I'm not overly happy with the parsing here but wanted to avoid
# importing the re library and doing a bunch of regular expressions.
fields = category_str.split("-")
req_usergroup = "default"
req_l = {"h_vmem": None, "h_rt": None, "h_stack": None}
req_queues = []
for field in fields:
if len(field) == 0:
# First field will always be empty as line (should) begin with delimiter
if field[0:2] == "pe":
# Expected format: "-pe multithread <int>"
# NOTE Must appear before a check on field[0] == 'p'
# NOTE key:value not stored as job_dict['slots'] already has the same data
# but block here suppresses "unknown string data" error in else.
elif field[0] == "U":
# Expected format: "-U <str>"
req_usergroup = field.split(" ")[1]
elif field[0] == "l":
# Expected format: "-l h_vmem=<float>[Mm|Gg],h_stack=<float>[Mm|Gg],h_rt=<int>"
for sub_field in field[1:].strip().split(","):
key, value = sub_field.strip().split("=")
if key.lower() == "h_stack" or key.lower() == "h_vmem":
# Convert value to MB
if value[-1].upper() == "G":
#TODO Multiply by 1000 or 1024?
value = float(value[:-1]) * 1024
elif value[-1].upper() == "M":
value = float(value[:-1])
elif value[-1].isdigit() is True:
# Convert bytes to MB
value = float(value) / 1000000
sys.stderr.write("[WARN] Unknown unit of memory encountered parsing -l subfield in queue category field: %s\n" % value[-1].upper())
sys.stderr.write(" %s\n" % field.strip())
elif key.lower() == "h_rt":
value = int(value)
sys.stderr.write("[WARN] Unknown subfield encountered parsing queue category field: %s\n" % key)
sys.stderr.write(" %s\n" % field.strip())
# Insert the new key:value pair to the req_l dict
req_l[key] = value
elif field[0] == "q":
# Expected format: "-q <str1>,<str2>,...,<strN>"
req_queues = [f.strip() for f in field[1:].split(",")[0:]]
sys.stderr.write("[WARN] Unknown queue category string field: %s\n" % field)
return {
"req_usergroup": req_usergroup,
"req_l": req_l,
"req_queues": req_queues,
def annotate_job(job_dict):
Given a job dict as created by `parse_job`, return a dict of potentially
useful additional metadata.
mem_req gibibytes of memory requested per slot
mem_req_tot gibibytes of memory requested across all slots
mem_used maximum memory used in gibibytes
mem_diff mem_req_tot - mem_used
mem_pct percentage of memory requested in respect to amount needed
qsub_dt qsub_time as datetime
start_dt start_time as datetime
end_dt end_time as datetime
time_taken end_time - start_time (as unix timestamp)
time_taken_td time_taken as datetime timedelta
mem_req = (job_dict["category"]["req_l"]["h_vmem"]/1024)
mem_req_tot = mem_req * job_dict["slots"]
mem_used = (job_dict["maxvmem"]/1000000000)*0.931323
mem_diff = mem_req_tot - mem_used
mem_pct = (mem_req_tot / mem_used) * 100
except ZeroDivisionError:
mem_pct = 0
start_dt = datetime.fromtimestamp(job_dict["start_time"])
end_dt = datetime.fromtimestamp(job_dict["end_time"])
time_taken_td = end_dt - start_dt
time_taken = job_dict["end_time"] - job_dict["start_time"]
return {
"mem": {
"mem_req": mem_req,
"mem_req_tot": mem_req_tot,
"mem_used": mem_used,
"mem_diff": mem_diff,
"mem_pct": mem_pct
"time": {
"qsub_dt": datetime.fromtimestamp(job_dict["qsub_time"]),
"start_dt": datetime.fromtimestamp(job_dict["start_time"]),
"end_dt": datetime.fromtimestamp(job_dict["end_time"]),
"time_taken": time_taken,
"time_taken_td": time_taken_td
def print_job(self, jid, fmt_str):
fields = fmt_str.split(":")
job =[jid]
out_str = []
for field in fields:
if "." in field:
field, subfield = field.split(".")
value = job.get(field, {}).get(subfield, None)
value = job.get(field, None)
if value is not None:
#TODO Will ignore fields which genuinely are allowed to be None
if type(value) is float:
if field not in self.fields_encountered:
sys.stderr.write("[WARN] Field '%s' not found in job object.\n" % field)
return "\t".join(out_str)
import argparse
import sys
from acct_parse import Account
# Example
# python ~/scripts/ "jobnumber:exit_status:mem.mem_req_tot:mem.mem_used:time.time_taken_td" --start 1235367 --end 1236460 [--failed]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("fmt_string", default="jobnumber:hostname:jobname:exit_status")
parser.add_argument("--acct_path", default="/cm/shared/apps/sge/6.2u5p2/default/common/accounting")
parser.add_argument("--start", type=int)
parser.add_argument("--end", type=int)
parser.add_argument("-f", "--failed", action="store_true")
args = parser.parse_args()
if args.start and args.end:
if args.start > args.end:
print("[FAIL] --start larger than --end, no jobs will be matched.")
acct = Account(args.acct_path)
for jid, job in sorted(
jid = int(jid)
if args.start:
if jid < args.start:
if args.end:
if jid > args.end:
if args.failed:
if job["exit_status"] == 0:
print acct.print_job(jid, args.fmt_string)
from acct_parse import Account
if __name__ == "__main__":
acct_path = "/cm/shared/apps/sge/6.2u5p2/default/common/accounting"
def parse_extra(j, ja):
hours_taken = ja["time"]["time_taken_td"].total_seconds() / (60*60)
ram_hours = ja["mem"]["mem_req"] * hours_taken
wasted_ram_hours = ja["mem"]["mem_diff"] * hours_taken
return {
"jobnumber": j["jobnumber"],
"hours_taken": hours_taken,
"ram_hours": ram_hours,
"wasted_ram_hours": wasted_ram_hours
acct = Account(acct_path, parse_extra=parse_extra)
print "jid\tnode\tname\tuser\tgbmem_req\tgbmem_used\tdelta_gbmem\tpct_mem\ttime\tgigaram_hours\twasted_gigaram_hours\texit"
for job in sorted(, key=lambda x:x[1]["extra"]['wasted_ram_hours']):
jid = job[0]
print acct.print_job(jid, "jobnumber:hostname:jobname:owner:mem.mem_req_tot:mem.mem_used:mem.mem_diff:mem.mem_pct:time.time_taken_td:extra.ram_hours:extra.wasted_ram_hours:exit_status")
Copy link


This seems to be a very good solution on parsing SGE accounting file. Is there a guide on how-to use it? I tried the script (i.e. python ./ /data/hpc/sge/xdmod/accounting_in_pieces/_acctae ) , but it did not return anything.


Copy link

Just want to say thanks. It worked beautifully!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment