Created
June 17, 2019 01:46
-
-
Save ScottWales/6bc498ef8b1d09d94213837d92a596ca to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Copyright 2019 Scott Wales | |
# | |
# Author: Scott Wales <scott.wales@unimelb.edu.au> | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import re | |
import subprocess | |
import pandas | |
import numpy | |
import pwd | |
queue_re = re.compile( | |
r'^(?P<queue>\S+)\s+' | |
r'(?P<state>\S+)\s+' | |
r'=+') | |
job_re = re.compile( | |
r'^(?P<jobid>\d+)\s+' | |
r'(?P<status>\S+)\s+' | |
r'(?P<user>\S+)\s+' | |
r'(?P<group>\S+)\s+' | |
r'(?P<name>.+)\s+' | |
r'(?P<cpu_percent>\d+)\s+' | |
r'(?P<walltime>\d+:\d+:\d+)\s+' | |
r'(?P<wall_limit>\d+:\d+:\d+)\s+' | |
r'(?P<vmem>\S+)\s+' | |
r'(?P<mem>\S+)\s+' | |
r'(?P<mem_limit>\S+)\s+' | |
r'(?P<ncpus>\d+)' | |
r'.*') | |
def nqstat(project): | |
queue = None | |
jobs = [] | |
r = subprocess.run(['ssh','raijin','/opt/pbs/default/bin/nqstat', '-P', project], | |
stdout=subprocess.PIPE, | |
universal_newlines=True) | |
for line in r.stdout.splitlines(): | |
match = queue_re.match(line) | |
if match is not None: | |
queue = match.group('queue') | |
match = job_re.match(line) | |
if match is not None: | |
job = match.groupdict() | |
job['queue'] = queue | |
jobs.append(job) | |
return jobs | |
def clex_nqstat(): | |
""" | |
Jobs from all clex projects | |
""" | |
jobs = [] | |
for proj in ['w35','w40','w42','w48','w97','v45']: | |
jobs.extend(nqstat(proj)) | |
return jobs | |
mem_re = re.compile(r'^(?P<value>.+)(?P<unit>.)B$') | |
def mem_to_mb(mem): | |
match = mem_re.match(mem) | |
if match is None: | |
raise ValueError | |
if match.group('unit') == 'T': | |
scale = 1024 ** 2 | |
elif match.group('unit') == 'G': | |
scale = 1024 ** 1 | |
elif match.group('unit') == 'M': | |
scale = 1 | |
elif match.group('unit') == 'K': | |
scale = 1024**-1 | |
else: | |
raise ValueError(f"Unknown unit {match.group('unit')}") | |
return float(match.group('value')) * scale | |
queue_mem = { | |
'normal': 2 * 1024, | |
'express': 2 * 1024, | |
'normalbw': 4.5 * 1024, | |
'expressbw': 4.5 * 1024, | |
'normalsl': 6 * 1024, | |
} | |
def postproc_nqstat(jobs): | |
""" | |
Convert to a dataframe | |
""" | |
df = pandas.DataFrame(jobs).set_index('jobid') | |
df['ncpus'] = df['ncpus'].astype(int) | |
df['cpu_percent'] = df['cpu_percent'].astype(int) | |
df['walltime'] = pandas.to_timedelta(df['walltime']) | |
df['wall_limit'] = pandas.to_timedelta(df['wall_limit']) | |
df['mem_mb'] = df['mem'].apply(mem_to_mb) | |
df['mem_limit_mb'] = df['mem_limit'].apply(mem_to_mb) | |
df['su_to_date'] = df['ncpus'] * df['walltime'].apply(lambda t: t.total_seconds() / 60.0 / 60.0) | |
df['su_limit'] = df['ncpus'] * df['wall_limit'].apply(lambda t: t.total_seconds() / 60.0 / 60.0) | |
df['su_wasted'] = (1 - df['cpu_percent'] / 100.0) * df['su_to_date'] | |
df['su_wasted_limit'] = (1 - df['cpu_percent'] / 100.0) * df['su_limit'] | |
min_mem = df['ncpus'] * df['queue'].apply(lambda q: queue_mem[q]) | |
df['excessive_mem'] = numpy.logical_and(df['mem_mb'] < min_mem, df['mem_limit_mb'] > min_mem) | |
return df | |
def report_badness(job): | |
""" | |
Create a single job report | |
Returns None for good jobs | |
""" | |
needs_report = False | |
report = f'For job "{job["name"]}" (PBS id {job.name}):\n' | |
if job['cpu_percent'] < 50 and job['su_wasted_limit'] > 200: | |
needs_report = True | |
report += f'\tCPU usage is low, at {job["cpu_percent"]}%, if your job runs to completion it will waste {job["su_wasted_limit"]/1000} kSU\n' | |
if job['excessive_mem'] == True: | |
needs_report = True | |
report += f'\tThe memory request is excessive, you\'re only using {job["mem_mb"]/1024} GB out of a {job["mem_limit_mb"]/1024} GB request. Requesting under {queue_mem[job["queue"]]/1024 * job["ncpus"]} GB may make your job start faster\n' | |
if needs_report: | |
return report | |
else: | |
return None | |
def collect_reports(df): | |
""" | |
Collect all reports from a dataframe | |
""" | |
reports = df.apply(report_badness, axis=1) | |
return '\n'.join(reports[reports.notnull()].values) | |
df = postproc_nqstat(clex_nqstat()) | |
for user, jobs in df.groupby('user'): | |
report = collect_reports(jobs) | |
if len(report) > 0: | |
pw = pwd.getpwnam(user) | |
message = f"Hi {pw.pw_gecos},\n\n" | |
message += "We've noticed some of your jobs running on Raijin are using CLEX projects inefficiently. These jobs are:\n\n" | |
message += report | |
message += '\n' | |
message += "If you need help improving your runs please contact the CLEX CMS team by emailling <cws_help@nci.org.au>\n" | |
subject = f"CLEX resource warning for {user}" | |
address = 'xxx@nci.org.au' | |
subprocess.run(['/bin/mailx','-s',subject,'-S','replyto=CLEX CMS<cws_help@nci.org.au>',address], input=message.encode('utf-8')) | |
print(message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment