Skip to content

Instantly share code, notes, and snippets.

@ScottWales
Created June 17, 2019 01:46
Show Gist options
  • Save ScottWales/6bc498ef8b1d09d94213837d92a596ca to your computer and use it in GitHub Desktop.
Save ScottWales/6bc498ef8b1d09d94213837d92a596ca to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#
# Copyright 2019 Scott Wales
#
# Author: Scott Wales <scott.wales@unimelb.edu.au>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import subprocess
import pandas
import numpy
import pwd
queue_re = re.compile(
r'^(?P<queue>\S+)\s+'
r'(?P<state>\S+)\s+'
r'=+')
job_re = re.compile(
r'^(?P<jobid>\d+)\s+'
r'(?P<status>\S+)\s+'
r'(?P<user>\S+)\s+'
r'(?P<group>\S+)\s+'
r'(?P<name>.+)\s+'
r'(?P<cpu_percent>\d+)\s+'
r'(?P<walltime>\d+:\d+:\d+)\s+'
r'(?P<wall_limit>\d+:\d+:\d+)\s+'
r'(?P<vmem>\S+)\s+'
r'(?P<mem>\S+)\s+'
r'(?P<mem_limit>\S+)\s+'
r'(?P<ncpus>\d+)'
r'.*')
def nqstat(project):
queue = None
jobs = []
r = subprocess.run(['ssh','raijin','/opt/pbs/default/bin/nqstat', '-P', project],
stdout=subprocess.PIPE,
universal_newlines=True)
for line in r.stdout.splitlines():
match = queue_re.match(line)
if match is not None:
queue = match.group('queue')
match = job_re.match(line)
if match is not None:
job = match.groupdict()
job['queue'] = queue
jobs.append(job)
return jobs
def clex_nqstat():
"""
Jobs from all clex projects
"""
jobs = []
for proj in ['w35','w40','w42','w48','w97','v45']:
jobs.extend(nqstat(proj))
return jobs
mem_re = re.compile(r'^(?P<value>.+)(?P<unit>.)B$')
def mem_to_mb(mem):
match = mem_re.match(mem)
if match is None:
raise ValueError
if match.group('unit') == 'T':
scale = 1024 ** 2
elif match.group('unit') == 'G':
scale = 1024 ** 1
elif match.group('unit') == 'M':
scale = 1
elif match.group('unit') == 'K':
scale = 1024**-1
else:
raise ValueError(f"Unknown unit {match.group('unit')}")
return float(match.group('value')) * scale
queue_mem = {
'normal': 2 * 1024,
'express': 2 * 1024,
'normalbw': 4.5 * 1024,
'expressbw': 4.5 * 1024,
'normalsl': 6 * 1024,
}
def postproc_nqstat(jobs):
"""
Convert to a dataframe
"""
df = pandas.DataFrame(jobs).set_index('jobid')
df['ncpus'] = df['ncpus'].astype(int)
df['cpu_percent'] = df['cpu_percent'].astype(int)
df['walltime'] = pandas.to_timedelta(df['walltime'])
df['wall_limit'] = pandas.to_timedelta(df['wall_limit'])
df['mem_mb'] = df['mem'].apply(mem_to_mb)
df['mem_limit_mb'] = df['mem_limit'].apply(mem_to_mb)
df['su_to_date'] = df['ncpus'] * df['walltime'].apply(lambda t: t.total_seconds() / 60.0 / 60.0)
df['su_limit'] = df['ncpus'] * df['wall_limit'].apply(lambda t: t.total_seconds() / 60.0 / 60.0)
df['su_wasted'] = (1 - df['cpu_percent'] / 100.0) * df['su_to_date']
df['su_wasted_limit'] = (1 - df['cpu_percent'] / 100.0) * df['su_limit']
min_mem = df['ncpus'] * df['queue'].apply(lambda q: queue_mem[q])
df['excessive_mem'] = numpy.logical_and(df['mem_mb'] < min_mem, df['mem_limit_mb'] > min_mem)
return df
def report_badness(job):
"""
Create a single job report
Returns None for good jobs
"""
needs_report = False
report = f'For job "{job["name"]}" (PBS id {job.name}):\n'
if job['cpu_percent'] < 50 and job['su_wasted_limit'] > 200:
needs_report = True
report += f'\tCPU usage is low, at {job["cpu_percent"]}%, if your job runs to completion it will waste {job["su_wasted_limit"]/1000} kSU\n'
if job['excessive_mem'] == True:
needs_report = True
report += f'\tThe memory request is excessive, you\'re only using {job["mem_mb"]/1024} GB out of a {job["mem_limit_mb"]/1024} GB request. Requesting under {queue_mem[job["queue"]]/1024 * job["ncpus"]} GB may make your job start faster\n'
if needs_report:
return report
else:
return None
def collect_reports(df):
"""
Collect all reports from a dataframe
"""
reports = df.apply(report_badness, axis=1)
return '\n'.join(reports[reports.notnull()].values)
df = postproc_nqstat(clex_nqstat())
for user, jobs in df.groupby('user'):
report = collect_reports(jobs)
if len(report) > 0:
pw = pwd.getpwnam(user)
message = f"Hi {pw.pw_gecos},\n\n"
message += "We've noticed some of your jobs running on Raijin are using CLEX projects inefficiently. These jobs are:\n\n"
message += report
message += '\n'
message += "If you need help improving your runs please contact the CLEX CMS team by emailling <cws_help@nci.org.au>\n"
subject = f"CLEX resource warning for {user}"
address = 'xxx@nci.org.au'
subprocess.run(['/bin/mailx','-s',subject,'-S','replyto=CLEX CMS<cws_help@nci.org.au>',address], input=message.encode('utf-8'))
print(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment