Skip to content

Instantly share code, notes, and snippets.

@anderbubble
Created March 3, 2011 15:55
Show Gist options
  • Save anderbubble/852985 to your computer and use it in GitHub Desktop.
Save anderbubble/852985 to your computer and use it in GitHub Desktop.
janderson@cm1:~> cat /usr/local/bin/email-reports
#!/usr/bin/env python
import ldap
import textwrap
import os
import subprocess
from docutils.core import publish_parts
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
import smtplib
import logging
from logging.handlers import SysLogHandler
import csv
import sys
from datetime import datetime, timedelta
import time
from decimal import Decimal
from StringIO import StringIO
import loadl.model
from loadl.model import JobStep
import optparse
from cbank import Allocation, Session, Project, Resource, Charge, Refund
from sqlalchemy import func, and_, create_engine
loadl.model.Session.configure(bind=create_engine("postgresql://user:password@fen4-a/loadl"))
log = logging.getLogger("reports")
log.setLevel(logging.INFO)
syslog = SysLogHandler()
formatter = logging.Formatter('%(name)s: %(message)s')
syslog.setFormatter(formatter)
log.addHandler(syslog)
l = ldap.open("ldap0.shaheen.kaust.edu.sa")
l.simple_bind_s("", "")
class NoOwner (Exception): pass
parser = optparse.OptionParser()
parser.add_option("-m", "--master", dest="master",
help="send the master report to ADDRESS", metavar="ADDRESS")
parser.add_option("-p", "--pi",
action="store_true", dest="send_pi_report", default=False,
help="send the PI report to all PIs")
parser.add_option("-s", "--start-period", dest="start",
help="report period START", metavar="START")
parser.add_option("-e", "--end-period", dest="end",
help="report period END", metavar="END")
parser.add_option("-n", "--no-op",
action="store_false", dest="sendmail", default=True,
help="don't send email: write to stderr")
def main ():
options, args = parser.parse_args()
if options.start:
start = parse_date(options.start)
else:
now = datetime.now()
last_month = datetime(now.year, now.month, 1) - timedelta(days=1)
start = datetime(last_month.year, last_month.month, 1)
if options.end:
end = parse_date(options.end)
else:
now = datetime.now()
end = datetime(now.year, now.month, 1)
Report.populate_caches(start, end)
if options.master:
report = MasterReport(start, end)
if options.sendmail:
sendmail(report.email, options.master)
else:
sys.stderr.write(str(report.email))
log.info("sent master report to %s" % options.master)
if options.send_pi_report:
for project in get_projects():
project_id = project[1]['cn'][0]
report = ProjectReport(project, start, end)
try:
if options.sendmail:
sendmail(report.email)
else:
sys.stderr.write(str(report.email))
except NoOwner:
pass
else:
log.info("sent report for project '%(project)s' to '%(address)s'" % {
'project':project_id, 'address':report.pi})
def parse_date (date_string):
format = "%Y-%m-%d"
return datetime(*(time.strptime(date_string, format)[0:6]))
def sendmail (message, destination=None):
if destination:
if not message['To']:
message['To'] = destination
else:
destination = message['To']
mta = smtplib.SMTP("localhost")
mta.sendmail(message['From'], [destination], message.as_string())
mta.quit()
class Report (object):
def __init__ (self, start, end):
self.start = start
self.end = end
cache = {}
def _get_html (self):
return publish_parts(self.rst, writer_name="html")['html_body']
html = property(_get_html)
@classmethod
def populate_caches (cls, start, end):
cls.cache['job-count'] = {
'shaheenaccess': job_counts(start, end, bg=True),
'neseraccess': job_counts(start, end, bg=False)}
cls.cache['job-dates'] = {
'shaheenaccess': last_jobs(end, bg=True),
'neseraccess': last_jobs(end, bg=False)}
cls.cache['storage-used'] = {
'/project': tsgetusage("/project"),
'/scratch': tsgetusage("/scratch")}
cls.cache['disabled'] = group_members("disabled")
def _write_csv_header (self, f=None):
if f is None:
f = StringIO()
writer = csv.writer(f)
writer.writerow([
"Resource",
"Project ID",
"Project PI",
"Project name",
"Job steps run during period",
"Date of last job by end of period",
"Total allocation at end of period",
"Total resource use as of end of period",
"Opening balance",
"New allocation during period",
"Resource use during period",
"Closing balance"])
def _write_computational_csv (self, project, f=None):
if f is None:
f = StringIO()
writer = csv.writer(f)
project_id = project[1]['cn'][0]
try:
project_pi = get_project_pi(project)
except NoOwner:
project_pi = ""
project_name = project[1]['name'][0]
for resource_id, convert in [("shaheenaccess", convert_bg), ("neseraccess", convert_x86)]:
writer.writerow([
resource_id,
project_id,
project_pi,
project_name,
self.cache['job-count'][resource_id].get(project_id, 0),
self.cache['job-dates'][resource_id].get(project_id, ""),
convert(self.total_allocation(project_id, resource_id)),
convert(self.time_used(project_id, resource_id, total=True)),
convert(self.balance(project_id, resource_id, self.start)),
convert(self.new_allocation(project_id, resource_id)),
convert(self.time_used(project_id, resource_id, total=False)),
convert(self.balance(project_id, resource_id, self.end))])
def _write_storage_csv (self, project, f=None):
if f is None:
f = StringIO()
writer = csv.writer(f)
project_id = project[1]['cn'][0]
try:
project_pi = get_project_pi(project)
except NoOwner:
project_pi = ""
project_name = project[1]['name'][0]
for fs in ["/scratch", "/project"]:
path = os.path.join(fs, project_id)
writer.writerow([
fs,
project_id,
project_pi,
project_name,
"",
"",
"",
convert_tb(self.cache['storage-used'][fs].get(project_id, 0)),
"",
"",
"",
""])
def _write_user_csv_header (self, f=None):
if f is None:
f = StringIO()
writer = csv.writer(f)
writer.writerow([
"uid",
"cn",
"mail",
"status"])
def _write_user_csv (self, user, f=None):
if f is None:
f = StringIO()
writer = csv.writer(f)
try:
uid = user[1]['uid'][0]
except (KeyError, IndexError):
uid = ""
try:
cn = user[1]['cn'][0]
except (KeyError, IndexError):
cn = ""
try:
mail = user[1]['mail'][0]
except (KeyError, IndexError):
mail = ""
if not uid:
status = "unknown"
elif uid in self.cache['disabled']:
status = "disabled"
else:
status = ""
writer.writerow([
uid,
cn,
mail,
status])
def total_allocation (self, project_id, resource_id):
allocations = Session.query(Allocation).filter(and_(
Allocation.project_id==Project.fetch(project_id).id,
Allocation.resource_id==Project.fetch(resource_id).id,
Allocation.start < self.end)).all()
return sum((allocation.amount for allocation in allocations))
def time_used (self, project_id, resource_id, total=False):
project = Project.fetch(project_id)
resource = Resource.fetch(resource_id)
filters = [
Allocation.project_id==project.id,
Allocation.resource_id==resource.id,
Charge.datetime < self.end]
if not total:
filters.append(Charge.datetime >= self.start)
filter_ = and_(*filters)
charged = Session.query(func.sum(Charge.amount)).join(Charge.allocation).filter(filter_)[0][0]
refunded = Session.query(func.sum(Refund.amount)).join(Refund.charge, Charge.allocation).filter(filter_)[0][0]
return (charged or 0) - (refunded or 0)
def balance (self, project_id, resource_id, date):
project = Project.fetch(project_id)
resource = Resource.fetch(resource_id)
filter_ = and_(
Allocation.project_id==project.id,
Allocation.resource_id==resource.id,
Allocation.start < date,
Allocation.end > date)
allocated = Session.query(func.sum(Allocation.amount)).filter(filter_)[0][0]
charged = Session.query(func.sum(Charge.amount)).join(Charge.allocation).filter(filter_).filter(Charge.datetime<=date)[0][0]
refunded = Session.query(func.sum(Refund.amount)).join(Refund.charge, Charge.allocation).filter(filter_).filter(Charge.datetime<=date)[0][0]
return (allocated or 0) - (charged or 0) + (refunded or 0)
def new_allocation (self, project_id, resource_id):
allocations = Session.query(Allocation).filter(and_(
Allocation.project_id==Project.fetch(project_id).id,
Allocation.resource_id==Project.fetch(resource_id).id,
Allocation.start >= self.start, Allocation.start < self.end))
return sum((allocation.amount for allocation in allocations))
def tsgetusage (fs):
exec_host = "fs1-a"
tsgetusage = subprocess.Popen(
["ssh", exec_host, "/usr/local/bin/tsgetusage", "-q", "-g", fs],
stdout=subprocess.PIPE)
return dict(((g[1], g[2]) for g in (l.split() for l in tsgetusage.stdout)))
def group_members (group):
try:
disabled_memberUid = l.search_s("cn=disabled,ou=group,dc=shaheen,dc=kaust,dc=edu,dc=sa", ldap.SCOPE_BASE)[0][1]['memberUid']
except (IndexError, KeyError):
return []
else:
if isinstance(disabled_memberUid, str):
return [disabled_memberUid]
else:
return disabled_memberUid
def job_counts (start, end, bg=False):
query = loadl.model.Session.query(JobStep.account, func.count(JobStep._id)).filter(and_(JobStep.completion_time>=start, JobStep.completion_time<end))
if bg:
query = query.filter(JobStep.type=="bluegene")
else:
query = query.filter(JobStep.type!="bluegene")
query = query.group_by(JobStep.account)
return dict((t[0], t[1]) for t in query)
def last_jobs (end, bg=False):
query = loadl.model.Session.query(JobStep.account, func.max(JobStep.completion_time)).filter(JobStep.completion_time<end)
if bg:
query = query.filter(JobStep.type=="bluegene")
else:
query = query.filter(JobStep.type!="bluegene")
query = query.group_by(JobStep.account)
return dict((t[0], t[1]) for t in query)
class MasterReport (Report):
rst_template = textwrap.dedent("""\
=================
KSL status report
=================
:Period start date: %(start)s
:Period end date: %(end)s (exclusive)
This report indicates the current use of KSL resources for all
projects, and the status of all defined users. Pertinent data
is included in the attached files, presented in csv format.
Use of computational systems (e.g., the Blue Gene) is reported
in core-hours. Use of storage systems (e.g., the /scratch
filesystem) is reported in terabytes (1000^3 bytes).
Comments or questions concerning the data presented here
should be sent to the KAUST Supercomputing Laboratory
helpdesk, help@hpc.kaust.edu.sa.""")
def send (self, destination):
send_master_email(self.start, self.end, csv, destination)
def _get_rst (self):
start_s = format_date(self.start)
end_s = format_date(self.end)
return self.rst_template % {'start':start_s, 'end':end_s}
rst = property(_get_rst)
def _get_csv (self):
csv = StringIO()
self._write_csv_header(f=csv)
for project in get_projects():
self._write_computational_csv(project, f=csv)
self._write_storage_csv(project, f=csv)
return csv
csv = property(_get_csv)
def _get_user_csv (self):
csv = StringIO()
self._write_user_csv_header(f=csv)
for user in get_users():
self._write_user_csv(user, f=csv)
return csv
user_csv = property(_get_user_csv)
def _get_email (self):
end_s = format_date(self.end)
message = MIMEMultipart()
message['Subject'] = "KSL status report for the period ending %s" % end_s
message['From'] = "KAUST Supercomputing Laboratory <help@hpc.kaust.edu.sa>"
body = MIMEMultipart("alternative")
body.attach(MIMEText(self.rst, "plain"))
body.attach(MIMEText(self.html, "html"))
message.attach(body)
resource_attachment = MIMEText(self.csv.getvalue())
resource_attachment.add_header('Content-Disposition', 'attachment',
filename="resource_use_%s.csv" % end_s)
resource_attachment.set_type("text/csv")
message.attach(resource_attachment)
user_attachment = MIMEText(self.user_csv.getvalue())
user_attachment.add_header('Content-Disposition', 'attachment',
filename="users_%s.csv" % end_s)
user_attachment.set_type("text/csv")
message.attach(user_attachment)
return message
email = property(_get_email)
class ProjectReport (Report):
rst_template = textwrap.dedent("""\
=================
KSL status report
=================
:Period start date: %(start)s
:Period end date: %(end)s (exclusive)
This report indicates the membership of and use of KSL
resources by project %(project)s.
Additional data is provided in the attached csv file.
Comments or questions concerning the data presented here should be
sent to the KAUST Supercomputing Laboratory helpdesk,
help@hpc.kaust.edu.sa.
Project members
===============
%(members)s
Computational allocations
=========================
This data can be obtained on-demand from a KSL front-end node
using the ``cbank`` command::
cbank -p %(project)s -a %(start)s -b %(end)s
Shaheen (Blue Gene)
-------------------
%(bluegene_cpu)s
Neser (Cluster)
---------------
%(cluster_cpu)s
Storage used
============
This data can be obtained on-demand from a KSL front-end node
using the ``du`` command::
du -xhs /project/%(project)s /scratch/project/%(project)s
/project
--------
%(project_data)s TB
/scratch/project
----------------
%(project_scratch)s TB""")
def __init__ (self, project, start, end):
Report.__init__(self, start, end)
self.project = project
def _get_rst (self):
project_id = self.project[1]['cn'][0]
start_s = format_date(self.start)
end_s = format_date(self.end)
return self.rst_template % {
'project':project_id,
'members':self.member_rst(),
'bluegene_cpu':self.cpu_rst("shaheenaccess"),
'cluster_cpu':self.cpu_rst("neseraccess"),
'project_data':convert_tb(self.cache['storage-used']['/project'].get(project_id, 0)),
'project_scratch':convert_tb(self.cache['storage-used']['/scratch'].get(project_id, 0)),
'start':start_s, 'end':end_s}
rst = property(_get_rst)
def member_rst (self):
active_members = (member for member in self.members if not self.is_disabled(member))
disabled_members = (member for member in self.members if self.is_disabled(member))
return os.linesep.join(
[format_member(member) for member in active_members]
+ [format_member(member, disabled=True) for member in disabled_members])
def is_disabled (self, user):
try:
uid = user[1]['uid'][0]
except (KeyError, IndexError):
return
else:
return uid in self.cache['disabled']
def _get_email (self):
end_s = format_date(self.end)
project_id = self.project[1]['cn'][0]
message = MIMEMultipart()
message['Subject'] = "KSL status report for %s" % project_id
message['From'] = "KAUST Supercomputing Laboratory <help@hpc.kaust.edu.sa>"
message['To'] = self.pi
body = MIMEMultipart("alternative")
body.attach(MIMEText(self.rst, "plain"))
body.attach(MIMEText(self.html, "html"))
message.attach(body)
resource_attachment = MIMEText(self.csv.getvalue())
resource_attachment.add_header('Content-Disposition', 'attachment',
filename="resource_use_%s_%s.csv" % (project_id, end_s))
resource_attachment.set_type("text/csv")
message.attach(resource_attachment)
member_attachment = MIMEText(self.user_csv.getvalue())
member_attachment.add_header('Content-Disposition', 'attachment',
filename="members_%s_%s.csv" % (project_id, end_s))
member_attachment.set_type("text/csv")
message.attach(member_attachment)
return message
email = property(_get_email)
def _get_members (self):
return get_project_members(self.project)
members = property(_get_members)
def _get_pi (self):
return get_project_pi(self.project)
pi = property(_get_pi)
def _get_csv (self):
csv = StringIO()
self._write_csv_header(f=csv)
self._write_computational_csv(self.project, f=csv)
self._write_storage_csv(self.project, f=csv)
return csv
csv = property(_get_csv)
def _get_user_csv (self):
csv = StringIO()
self._write_user_csv_header(f=csv)
for user in self.members:
self._write_user_csv(user, f=csv)
return csv
user_csv = property(_get_user_csv)
def cpu_rst (self, resource_id):
template = textwrap.dedent("""\
:Job steps: %(job_steps)s
:Resources used: %(resources_used)s core-hours
:Balance: %(balance)s core-hours""")
if resource_id == "shaheenaccess":
convert = convert_bg
elif resource_id == "neseraccess":
convert = convert_x86
else:
raise Exception("unknown resource %s" % resource)
project_id = self.project[1]['cn'][0]
data = {
'job_steps':
self.cache['job-count'][resource_id].get(project_id, 0),
'resources_used':
convert(self.time_used(project_id, resource_id, total=False)),
'balance':
convert(self.balance(project_id, resource_id, self.end))}
return template % data
def get_project_pi (project):
project_dn = project[0]
project_id = project[1]['cn'][0]
try:
owner_dn = l.search_s(project_dn, ldap.SCOPE_BASE)[0][1]['owner'][0]
except KeyError:
raise NoOwner("%s has no owner" % project_id)
try:
owner = l.search_s(owner_dn, ldap.SCOPE_BASE)[0][1]
except ldap.NO_SUCH_OBJECT:
return owner_dn
else:
owner_mail = owner['mail'][0]
owner_cn = owner['cn'][0]
return "%(owner_cn)s <%(owner_mail)s>" % {'owner_mail':owner_mail, 'owner_cn':owner_cn}
def convert_tb (bytes):
return round(Decimal(bytes) / (1000 ** 3), 3)
def convert_bg (amount):
return int(round((Decimal(amount) * 4 / 60 / 60), 0))
def convert_x86 (amount):
return int(round((Decimal(amount) * 8 / 60 / 60), 0))
def format_date (date):
return date.strftime("%Y-%m-%d")
def format_member (user, disabled=False):
try:
cn = user[1]['cn'][0]
except (KeyError, IndexError):
cn = str(member)
try:
mail = user[1]['mail'][0]
except (KeyError, IndexError):
user_string = cn
else:
user_string = "%(cn)s <%(mail)s>" % {'mail':mail, 'cn':cn}
if disabled:
user_string += " (disabled)"
return "* %s" % user_string
def get_projects ():
return l.search_s("ou=group,dc=shaheen,dc=kaust,dc=edu,dc=sa", ldap.SCOPE_ONELEVEL, "(objectclass=kslProject)")
def get_project_members (project):
if 'memberUid' not in project[1]:
memberUid = []
elif isinstance(project[1]['memberUid'], str):
memberUid = [project[1]['memberUid']]
else:
memberUid = project[1]['memberUid']
for uid in memberUid:
yield l.search_s("uid=%s,ou=users,dc=shaheen,dc=kaust,dc=edu,dc=sa" % uid, ldap.SCOPE_BASE)[0]
def get_users ():
return l.search_s("ou=users,dc=shaheen,dc=kaust,dc=edu,dc=sa", ldap.SCOPE_ONELEVEL, "(objectclass=kslAccount)")
def get_uids ():
return [user[1]['uid'][0] for user in get_users()]
def indent (iterable):
return "".join(" %s" % line for line in iterable)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment