Skip to content

Instantly share code, notes, and snippets.

@FindHao
Last active July 8, 2021 15:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FindHao/73c16e20e8eb285e6daf58d5a45ca4dc to your computer and use it in GitHub Desktop.
Save FindHao/73c16e20e8eb285e6daf58d5a45ca4dc to your computer and use it in GitHub Desktop.
hpc job notify
#!/usr/bin/env python3
import argparse
import subprocess
import time
import requests
START_MODE = 0
FINISH_MODE = 1
FULL_MODE = 2
err_no_jobs = 10
err_no_such_process = 11
err_no_specific_job = 12
err_unknown = 100
err_list = [err_unknown, err_no_jobs, err_no_such_process, err_no_specific_job]
success = 0
STATUS_PENDING = 1
STATUS_RUN = 2
STATUS_FINISH = 3
class JOB:
def __init__(self):
self.name = ''
self.pid = None
self.queue = None
self.status = None
def __str__(self):
tmp = '{:10s} {:10s} {:10s}\n'.format("job id", "queue", "job name")
tmp += '{:<10d} {:10s} {:10s}'.format(self.pid, self.queue, self.name)
return tmp
job = None
def send_notification(message):
post_data = {"token": "",
"user": "",
"message": message}
r = requests.post("https://api.pushover.net/1/messages.json", data=post_data)
if r.status_code != 200:
print("notification send error")
else:
print("successfully send notification")
def get_job():
global job
try:
out_bytes = subprocess.check_output(['bjobs'], stderr=subprocess.STDOUT, timeout=10)
out_text = out_bytes.decode('utf-8').split('\n')[1:]
got = False
if not out_text:
print("no jobs running")
return err_no_jobs
new_out_text = [_ for _ in out_text if _.strip()]
if len(new_out_text) < 1:
print("There are multiple jobs running. Please specify which one you'd like to monitor via option -i.")
return err_no_specific_job
line = new_out_text[0]
items = line.strip().split()
r_pid = int(items[0])
job_status = items[2]
job_queue = items[4]
job_name = items[-1]
if not job:
job = JOB()
job.name = job_name
job.pid = r_pid
job.queue = job_queue
job.status = job_status
print(job)
except subprocess.CalledProcessError as e:
out_bytes = e.output # Output generated before error
code = e.returncode # Return code
print(e)
return err_unknown
def work(pid, mode):
global job
try:
out_bytes = subprocess.check_output(['bjobs'], stderr=subprocess.STDOUT, timeout=10)
out_text = out_bytes.decode('utf-8').split('\n')[1:]
got = False
if not out_text:
print("no jobs running")
return err_no_jobs
for line in out_text:
items = line.strip().split()
if not items:
continue
r_pid = int(items[0])
job_status = items[2]
if r_pid == pid:
got = True
job_queue = items[4]
job_name = items[-1]
if not job:
job = JOB()
job.name = job_name
job.pid = pid
job.queue = job_queue
print(job)
if mode == START_MODE and job_status == 'RUN':
message = "summit job starts: \njob_id queue job_name\n%d %s %s" % (
pid, job.queue, job.name)
send_notification(message)
return success
if not got:
if mode == START_MODE:
if job.status == STATUS_PENDING:
message = "summit job is killed: \njob_id queue job_name\n%d %s %s" % (
pid, job.queue, job.name)
send_notification(message)
else:
print("didn't find target process")
return err_no_such_process
elif not job:
print("no such job is running")
return err_no_such_process
else:
message = "summit job finished: \njob_id queue job_name\n%d %s %s" % (
pid, job.queue, job.name)
send_notification(message)
return success
return None
except subprocess.CalledProcessError as e:
out_bytes = e.output # Output generated before error
code = e.returncode # Return code
print(e)
return err_unknown
def call_work(pid, amode):
while True:
return_code = work(pid, amode)
if return_code in err_list:
print(return_code)
exit(-1)
elif return_code == success:
break
time.sleep(10)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--id', metavar='The process id',
required=False, dest='pid', action='store')
parser.add_argument('-m', '--mode',
metavar='s|start: when the status of target process becomes running, it sends notification. It is default mode. f|finish: when the target process finished, it sends notification. full: start + finish',
required=True, dest='mode', action='store')
args = parser.parse_args()
if not args.pid:
if get_job() in err_list:
exit(-1)
pid = job.pid
else:
pid = int(args.pid)
mode = None
if args.mode:
if args.mode in ['s', 'start']:
mode = START_MODE
elif args.mode in ['f', 'finish']:
mode = FINISH_MODE
elif args.mode == 'full':
mode = FULL_MODE
else:
print("wrong argument: mode", args.mode)
exit(-1)
else:
mode = START_MODE
if mode == FULL_MODE:
call_work(pid, START_MODE)
call_work(pid, FINISH_MODE)
else:
call_work(pid, mode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment