Created
March 3, 2011 15:55
-
-
Save anderbubble/852985 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
janderson@cm1:~> cat /usr/local/bin/email-reports | |
#!/usr/bin/env python | |
import ldap | |
import textwrap | |
import os | |
import subprocess | |
from docutils.core import publish_parts | |
from email.MIMEMultipart import MIMEMultipart | |
from email.MIMEText import MIMEText | |
import smtplib | |
import logging | |
from logging.handlers import SysLogHandler | |
import csv | |
import sys | |
from datetime import datetime, timedelta | |
import time | |
from decimal import Decimal | |
from StringIO import StringIO | |
import loadl.model | |
from loadl.model import JobStep | |
import optparse | |
from cbank import Allocation, Session, Project, Resource, Charge, Refund | |
from sqlalchemy import func, and_, create_engine | |
loadl.model.Session.configure(bind=create_engine("postgresql://user:password@fen4-a/loadl")) | |
log = logging.getLogger("reports") | |
log.setLevel(logging.INFO) | |
syslog = SysLogHandler() | |
formatter = logging.Formatter('%(name)s: %(message)s') | |
syslog.setFormatter(formatter) | |
log.addHandler(syslog) | |
l = ldap.open("ldap0.shaheen.kaust.edu.sa") | |
l.simple_bind_s("", "") | |
class NoOwner (Exception): pass | |
parser = optparse.OptionParser() | |
parser.add_option("-m", "--master", dest="master", | |
help="send the master report to ADDRESS", metavar="ADDRESS") | |
parser.add_option("-p", "--pi", | |
action="store_true", dest="send_pi_report", default=False, | |
help="send the PI report to all PIs") | |
parser.add_option("-s", "--start-period", dest="start", | |
help="report period START", metavar="START") | |
parser.add_option("-e", "--end-period", dest="end", | |
help="report period END", metavar="END") | |
parser.add_option("-n", "--no-op", | |
action="store_false", dest="sendmail", default=True, | |
help="don't send email: write to stderr") | |
def main (): | |
options, args = parser.parse_args() | |
if options.start: | |
start = parse_date(options.start) | |
else: | |
now = datetime.now() | |
last_month = datetime(now.year, now.month, 1) - timedelta(days=1) | |
start = datetime(last_month.year, last_month.month, 1) | |
if options.end: | |
end = parse_date(options.end) | |
else: | |
now = datetime.now() | |
end = datetime(now.year, now.month, 1) | |
Report.populate_caches(start, end) | |
if options.master: | |
report = MasterReport(start, end) | |
if options.sendmail: | |
sendmail(report.email, options.master) | |
else: | |
sys.stderr.write(str(report.email)) | |
log.info("sent master report to %s" % options.master) | |
if options.send_pi_report: | |
for project in get_projects(): | |
project_id = project[1]['cn'][0] | |
report = ProjectReport(project, start, end) | |
try: | |
if options.sendmail: | |
sendmail(report.email) | |
else: | |
sys.stderr.write(str(report.email)) | |
except NoOwner: | |
pass | |
else: | |
log.info("sent report for project '%(project)s' to '%(address)s'" % { | |
'project':project_id, 'address':report.pi}) | |
def parse_date (date_string): | |
format = "%Y-%m-%d" | |
return datetime(*(time.strptime(date_string, format)[0:6])) | |
def sendmail (message, destination=None): | |
if destination: | |
if not message['To']: | |
message['To'] = destination | |
else: | |
destination = message['To'] | |
mta = smtplib.SMTP("localhost") | |
mta.sendmail(message['From'], [destination], message.as_string()) | |
mta.quit() | |
class Report (object): | |
def __init__ (self, start, end): | |
self.start = start | |
self.end = end | |
cache = {} | |
def _get_html (self): | |
return publish_parts(self.rst, writer_name="html")['html_body'] | |
html = property(_get_html) | |
@classmethod | |
def populate_caches (cls, start, end): | |
cls.cache['job-count'] = { | |
'shaheenaccess': job_counts(start, end, bg=True), | |
'neseraccess': job_counts(start, end, bg=False)} | |
cls.cache['job-dates'] = { | |
'shaheenaccess': last_jobs(end, bg=True), | |
'neseraccess': last_jobs(end, bg=False)} | |
cls.cache['storage-used'] = { | |
'/project': tsgetusage("/project"), | |
'/scratch': tsgetusage("/scratch")} | |
cls.cache['disabled'] = group_members("disabled") | |
def _write_csv_header (self, f=None): | |
if f is None: | |
f = StringIO() | |
writer = csv.writer(f) | |
writer.writerow([ | |
"Resource", | |
"Project ID", | |
"Project PI", | |
"Project name", | |
"Job steps run during period", | |
"Date of last job by end of period", | |
"Total allocation at end of period", | |
"Total resource use as of end of period", | |
"Opening balance", | |
"New allocation during period", | |
"Resource use during period", | |
"Closing balance"]) | |
def _write_computational_csv (self, project, f=None): | |
if f is None: | |
f = StringIO() | |
writer = csv.writer(f) | |
project_id = project[1]['cn'][0] | |
try: | |
project_pi = get_project_pi(project) | |
except NoOwner: | |
project_pi = "" | |
project_name = project[1]['name'][0] | |
for resource_id, convert in [("shaheenaccess", convert_bg), ("neseraccess", convert_x86)]: | |
writer.writerow([ | |
resource_id, | |
project_id, | |
project_pi, | |
project_name, | |
self.cache['job-count'][resource_id].get(project_id, 0), | |
self.cache['job-dates'][resource_id].get(project_id, ""), | |
convert(self.total_allocation(project_id, resource_id)), | |
convert(self.time_used(project_id, resource_id, total=True)), | |
convert(self.balance(project_id, resource_id, self.start)), | |
convert(self.new_allocation(project_id, resource_id)), | |
convert(self.time_used(project_id, resource_id, total=False)), | |
convert(self.balance(project_id, resource_id, self.end))]) | |
def _write_storage_csv (self, project, f=None): | |
if f is None: | |
f = StringIO() | |
writer = csv.writer(f) | |
project_id = project[1]['cn'][0] | |
try: | |
project_pi = get_project_pi(project) | |
except NoOwner: | |
project_pi = "" | |
project_name = project[1]['name'][0] | |
for fs in ["/scratch", "/project"]: | |
path = os.path.join(fs, project_id) | |
writer.writerow([ | |
fs, | |
project_id, | |
project_pi, | |
project_name, | |
"", | |
"", | |
"", | |
convert_tb(self.cache['storage-used'][fs].get(project_id, 0)), | |
"", | |
"", | |
"", | |
""]) | |
def _write_user_csv_header (self, f=None): | |
if f is None: | |
f = StringIO() | |
writer = csv.writer(f) | |
writer.writerow([ | |
"uid", | |
"cn", | |
"mail", | |
"status"]) | |
def _write_user_csv (self, user, f=None): | |
if f is None: | |
f = StringIO() | |
writer = csv.writer(f) | |
try: | |
uid = user[1]['uid'][0] | |
except (KeyError, IndexError): | |
uid = "" | |
try: | |
cn = user[1]['cn'][0] | |
except (KeyError, IndexError): | |
cn = "" | |
try: | |
mail = user[1]['mail'][0] | |
except (KeyError, IndexError): | |
mail = "" | |
if not uid: | |
status = "unknown" | |
elif uid in self.cache['disabled']: | |
status = "disabled" | |
else: | |
status = "" | |
writer.writerow([ | |
uid, | |
cn, | |
mail, | |
status]) | |
def total_allocation (self, project_id, resource_id): | |
allocations = Session.query(Allocation).filter(and_( | |
Allocation.project_id==Project.fetch(project_id).id, | |
Allocation.resource_id==Project.fetch(resource_id).id, | |
Allocation.start < self.end)).all() | |
return sum((allocation.amount for allocation in allocations)) | |
def time_used (self, project_id, resource_id, total=False): | |
project = Project.fetch(project_id) | |
resource = Resource.fetch(resource_id) | |
filters = [ | |
Allocation.project_id==project.id, | |
Allocation.resource_id==resource.id, | |
Charge.datetime < self.end] | |
if not total: | |
filters.append(Charge.datetime >= self.start) | |
filter_ = and_(*filters) | |
charged = Session.query(func.sum(Charge.amount)).join(Charge.allocation).filter(filter_)[0][0] | |
refunded = Session.query(func.sum(Refund.amount)).join(Refund.charge, Charge.allocation).filter(filter_)[0][0] | |
return (charged or 0) - (refunded or 0) | |
def balance (self, project_id, resource_id, date): | |
project = Project.fetch(project_id) | |
resource = Resource.fetch(resource_id) | |
filter_ = and_( | |
Allocation.project_id==project.id, | |
Allocation.resource_id==resource.id, | |
Allocation.start < date, | |
Allocation.end > date) | |
allocated = Session.query(func.sum(Allocation.amount)).filter(filter_)[0][0] | |
charged = Session.query(func.sum(Charge.amount)).join(Charge.allocation).filter(filter_).filter(Charge.datetime<=date)[0][0] | |
refunded = Session.query(func.sum(Refund.amount)).join(Refund.charge, Charge.allocation).filter(filter_).filter(Charge.datetime<=date)[0][0] | |
return (allocated or 0) - (charged or 0) + (refunded or 0) | |
def new_allocation (self, project_id, resource_id): | |
allocations = Session.query(Allocation).filter(and_( | |
Allocation.project_id==Project.fetch(project_id).id, | |
Allocation.resource_id==Project.fetch(resource_id).id, | |
Allocation.start >= self.start, Allocation.start < self.end)) | |
return sum((allocation.amount for allocation in allocations)) | |
def tsgetusage (fs): | |
exec_host = "fs1-a" | |
tsgetusage = subprocess.Popen( | |
["ssh", exec_host, "/usr/local/bin/tsgetusage", "-q", "-g", fs], | |
stdout=subprocess.PIPE) | |
return dict(((g[1], g[2]) for g in (l.split() for l in tsgetusage.stdout))) | |
def group_members (group): | |
try: | |
disabled_memberUid = l.search_s("cn=disabled,ou=group,dc=shaheen,dc=kaust,dc=edu,dc=sa", ldap.SCOPE_BASE)[0][1]['memberUid'] | |
except (IndexError, KeyError): | |
return [] | |
else: | |
if isinstance(disabled_memberUid, str): | |
return [disabled_memberUid] | |
else: | |
return disabled_memberUid | |
def job_counts (start, end, bg=False): | |
query = loadl.model.Session.query(JobStep.account, func.count(JobStep._id)).filter(and_(JobStep.completion_time>=start, JobStep.completion_time<end)) | |
if bg: | |
query = query.filter(JobStep.type=="bluegene") | |
else: | |
query = query.filter(JobStep.type!="bluegene") | |
query = query.group_by(JobStep.account) | |
return dict((t[0], t[1]) for t in query) | |
def last_jobs (end, bg=False): | |
query = loadl.model.Session.query(JobStep.account, func.max(JobStep.completion_time)).filter(JobStep.completion_time<end) | |
if bg: | |
query = query.filter(JobStep.type=="bluegene") | |
else: | |
query = query.filter(JobStep.type!="bluegene") | |
query = query.group_by(JobStep.account) | |
return dict((t[0], t[1]) for t in query) | |
class MasterReport (Report): | |
rst_template = textwrap.dedent("""\ | |
================= | |
KSL status report | |
================= | |
:Period start date: %(start)s | |
:Period end date: %(end)s (exclusive) | |
This report indicates the current use of KSL resources for all | |
projects, and the status of all defined users. Pertinent data | |
is included in the attached files, presented in csv format. | |
Use of computational systems (e.g., the Blue Gene) is reported | |
in core-hours. Use of storage systems (e.g., the /scratch | |
filesystem) is reported in terabytes (1000^3 bytes). | |
Comments or questions concerning the data presented here | |
should be sent to the KAUST Supercomputing Laboratory | |
helpdesk, help@hpc.kaust.edu.sa.""") | |
def send (self, destination): | |
send_master_email(self.start, self.end, csv, destination) | |
def _get_rst (self): | |
start_s = format_date(self.start) | |
end_s = format_date(self.end) | |
return self.rst_template % {'start':start_s, 'end':end_s} | |
rst = property(_get_rst) | |
def _get_csv (self): | |
csv = StringIO() | |
self._write_csv_header(f=csv) | |
for project in get_projects(): | |
self._write_computational_csv(project, f=csv) | |
self._write_storage_csv(project, f=csv) | |
return csv | |
csv = property(_get_csv) | |
def _get_user_csv (self): | |
csv = StringIO() | |
self._write_user_csv_header(f=csv) | |
for user in get_users(): | |
self._write_user_csv(user, f=csv) | |
return csv | |
user_csv = property(_get_user_csv) | |
def _get_email (self): | |
end_s = format_date(self.end) | |
message = MIMEMultipart() | |
message['Subject'] = "KSL status report for the period ending %s" % end_s | |
message['From'] = "KAUST Supercomputing Laboratory <help@hpc.kaust.edu.sa>" | |
body = MIMEMultipart("alternative") | |
body.attach(MIMEText(self.rst, "plain")) | |
body.attach(MIMEText(self.html, "html")) | |
message.attach(body) | |
resource_attachment = MIMEText(self.csv.getvalue()) | |
resource_attachment.add_header('Content-Disposition', 'attachment', | |
filename="resource_use_%s.csv" % end_s) | |
resource_attachment.set_type("text/csv") | |
message.attach(resource_attachment) | |
user_attachment = MIMEText(self.user_csv.getvalue()) | |
user_attachment.add_header('Content-Disposition', 'attachment', | |
filename="users_%s.csv" % end_s) | |
user_attachment.set_type("text/csv") | |
message.attach(user_attachment) | |
return message | |
email = property(_get_email) | |
class ProjectReport (Report): | |
rst_template = textwrap.dedent("""\ | |
================= | |
KSL status report | |
================= | |
:Period start date: %(start)s | |
:Period end date: %(end)s (exclusive) | |
This report indicates the membership of and use of KSL | |
resources by project %(project)s. | |
Additional data is provided in the attached csv file. | |
Comments or questions concerning the data presented here should be | |
sent to the KAUST Supercomputing Laboratory helpdesk, | |
help@hpc.kaust.edu.sa. | |
Project members | |
=============== | |
%(members)s | |
Computational allocations | |
========================= | |
This data can be obtained on-demand from a KSL front-end node | |
using the ``cbank`` command:: | |
cbank -p %(project)s -a %(start)s -b %(end)s | |
Shaheen (Blue Gene) | |
------------------- | |
%(bluegene_cpu)s | |
Neser (Cluster) | |
--------------- | |
%(cluster_cpu)s | |
Storage used | |
============ | |
This data can be obtained on-demand from a KSL front-end node | |
using the ``du`` command:: | |
du -xhs /project/%(project)s /scratch/project/%(project)s | |
/project | |
-------- | |
%(project_data)s TB | |
/scratch/project | |
---------------- | |
%(project_scratch)s TB""") | |
def __init__ (self, project, start, end): | |
Report.__init__(self, start, end) | |
self.project = project | |
def _get_rst (self): | |
project_id = self.project[1]['cn'][0] | |
start_s = format_date(self.start) | |
end_s = format_date(self.end) | |
return self.rst_template % { | |
'project':project_id, | |
'members':self.member_rst(), | |
'bluegene_cpu':self.cpu_rst("shaheenaccess"), | |
'cluster_cpu':self.cpu_rst("neseraccess"), | |
'project_data':convert_tb(self.cache['storage-used']['/project'].get(project_id, 0)), | |
'project_scratch':convert_tb(self.cache['storage-used']['/scratch'].get(project_id, 0)), | |
'start':start_s, 'end':end_s} | |
rst = property(_get_rst) | |
def member_rst (self): | |
active_members = (member for member in self.members if not self.is_disabled(member)) | |
disabled_members = (member for member in self.members if self.is_disabled(member)) | |
return os.linesep.join( | |
[format_member(member) for member in active_members] | |
+ [format_member(member, disabled=True) for member in disabled_members]) | |
def is_disabled (self, user): | |
try: | |
uid = user[1]['uid'][0] | |
except (KeyError, IndexError): | |
return | |
else: | |
return uid in self.cache['disabled'] | |
def _get_email (self): | |
end_s = format_date(self.end) | |
project_id = self.project[1]['cn'][0] | |
message = MIMEMultipart() | |
message['Subject'] = "KSL status report for %s" % project_id | |
message['From'] = "KAUST Supercomputing Laboratory <help@hpc.kaust.edu.sa>" | |
message['To'] = self.pi | |
body = MIMEMultipart("alternative") | |
body.attach(MIMEText(self.rst, "plain")) | |
body.attach(MIMEText(self.html, "html")) | |
message.attach(body) | |
resource_attachment = MIMEText(self.csv.getvalue()) | |
resource_attachment.add_header('Content-Disposition', 'attachment', | |
filename="resource_use_%s_%s.csv" % (project_id, end_s)) | |
resource_attachment.set_type("text/csv") | |
message.attach(resource_attachment) | |
member_attachment = MIMEText(self.user_csv.getvalue()) | |
member_attachment.add_header('Content-Disposition', 'attachment', | |
filename="members_%s_%s.csv" % (project_id, end_s)) | |
member_attachment.set_type("text/csv") | |
message.attach(member_attachment) | |
return message | |
email = property(_get_email) | |
def _get_members (self): | |
return get_project_members(self.project) | |
members = property(_get_members) | |
def _get_pi (self): | |
return get_project_pi(self.project) | |
pi = property(_get_pi) | |
def _get_csv (self): | |
csv = StringIO() | |
self._write_csv_header(f=csv) | |
self._write_computational_csv(self.project, f=csv) | |
self._write_storage_csv(self.project, f=csv) | |
return csv | |
csv = property(_get_csv) | |
def _get_user_csv (self): | |
csv = StringIO() | |
self._write_user_csv_header(f=csv) | |
for user in self.members: | |
self._write_user_csv(user, f=csv) | |
return csv | |
user_csv = property(_get_user_csv) | |
def cpu_rst (self, resource_id): | |
template = textwrap.dedent("""\ | |
:Job steps: %(job_steps)s | |
:Resources used: %(resources_used)s core-hours | |
:Balance: %(balance)s core-hours""") | |
if resource_id == "shaheenaccess": | |
convert = convert_bg | |
elif resource_id == "neseraccess": | |
convert = convert_x86 | |
else: | |
raise Exception("unknown resource %s" % resource) | |
project_id = self.project[1]['cn'][0] | |
data = { | |
'job_steps': | |
self.cache['job-count'][resource_id].get(project_id, 0), | |
'resources_used': | |
convert(self.time_used(project_id, resource_id, total=False)), | |
'balance': | |
convert(self.balance(project_id, resource_id, self.end))} | |
return template % data | |
def get_project_pi (project): | |
project_dn = project[0] | |
project_id = project[1]['cn'][0] | |
try: | |
owner_dn = l.search_s(project_dn, ldap.SCOPE_BASE)[0][1]['owner'][0] | |
except KeyError: | |
raise NoOwner("%s has no owner" % project_id) | |
try: | |
owner = l.search_s(owner_dn, ldap.SCOPE_BASE)[0][1] | |
except ldap.NO_SUCH_OBJECT: | |
return owner_dn | |
else: | |
owner_mail = owner['mail'][0] | |
owner_cn = owner['cn'][0] | |
return "%(owner_cn)s <%(owner_mail)s>" % {'owner_mail':owner_mail, 'owner_cn':owner_cn} | |
def convert_tb (bytes): | |
return round(Decimal(bytes) / (1000 ** 3), 3) | |
def convert_bg (amount): | |
return int(round((Decimal(amount) * 4 / 60 / 60), 0)) | |
def convert_x86 (amount): | |
return int(round((Decimal(amount) * 8 / 60 / 60), 0)) | |
def format_date (date): | |
return date.strftime("%Y-%m-%d") | |
def format_member (user, disabled=False): | |
try: | |
cn = user[1]['cn'][0] | |
except (KeyError, IndexError): | |
cn = str(member) | |
try: | |
mail = user[1]['mail'][0] | |
except (KeyError, IndexError): | |
user_string = cn | |
else: | |
user_string = "%(cn)s <%(mail)s>" % {'mail':mail, 'cn':cn} | |
if disabled: | |
user_string += " (disabled)" | |
return "* %s" % user_string | |
def get_projects (): | |
return l.search_s("ou=group,dc=shaheen,dc=kaust,dc=edu,dc=sa", ldap.SCOPE_ONELEVEL, "(objectclass=kslProject)") | |
def get_project_members (project): | |
if 'memberUid' not in project[1]: | |
memberUid = [] | |
elif isinstance(project[1]['memberUid'], str): | |
memberUid = [project[1]['memberUid']] | |
else: | |
memberUid = project[1]['memberUid'] | |
for uid in memberUid: | |
yield l.search_s("uid=%s,ou=users,dc=shaheen,dc=kaust,dc=edu,dc=sa" % uid, ldap.SCOPE_BASE)[0] | |
def get_users (): | |
return l.search_s("ou=users,dc=shaheen,dc=kaust,dc=edu,dc=sa", ldap.SCOPE_ONELEVEL, "(objectclass=kslAccount)") | |
def get_uids (): | |
return [user[1]['uid'][0] for user in get_users()] | |
def indent (iterable): | |
return "".join(" %s" % line for line in iterable) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment