Last active
July 8, 2021 15:14
-
-
Save FindHao/73c16e20e8eb285e6daf58d5a45ca4dc to your computer and use it in GitHub Desktop.
hpc job notify
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import subprocess | |
import time | |
import requests | |
START_MODE = 0 | |
FINISH_MODE = 1 | |
FULL_MODE = 2 | |
err_no_jobs = 10 | |
err_no_such_process = 11 | |
err_no_specific_job = 12 | |
err_unknown = 100 | |
err_list = [err_unknown, err_no_jobs, err_no_such_process, err_no_specific_job] | |
success = 0 | |
STATUS_PENDING = 1 | |
STATUS_RUN = 2 | |
STATUS_FINISH = 3 | |
class JOB: | |
def __init__(self): | |
self.name = '' | |
self.pid = None | |
self.queue = None | |
self.status = None | |
def __str__(self): | |
tmp = '{:10s} {:10s} {:10s}\n'.format("job id", "queue", "job name") | |
tmp += '{:<10d} {:10s} {:10s}'.format(self.pid, self.queue, self.name) | |
return tmp | |
job = None | |
def send_notification(message): | |
post_data = {"token": "", | |
"user": "", | |
"message": message} | |
r = requests.post("https://api.pushover.net/1/messages.json", data=post_data) | |
if r.status_code != 200: | |
print("notification send error") | |
else: | |
print("successfully send notification") | |
def get_job(): | |
global job | |
try: | |
out_bytes = subprocess.check_output(['bjobs'], stderr=subprocess.STDOUT, timeout=10) | |
out_text = out_bytes.decode('utf-8').split('\n')[1:] | |
got = False | |
if not out_text: | |
print("no jobs running") | |
return err_no_jobs | |
new_out_text = [_ for _ in out_text if _.strip()] | |
if len(new_out_text) < 1: | |
print("There are multiple jobs running. Please specify which one you'd like to monitor via option -i.") | |
return err_no_specific_job | |
line = new_out_text[0] | |
items = line.strip().split() | |
r_pid = int(items[0]) | |
job_status = items[2] | |
job_queue = items[4] | |
job_name = items[-1] | |
if not job: | |
job = JOB() | |
job.name = job_name | |
job.pid = r_pid | |
job.queue = job_queue | |
job.status = job_status | |
print(job) | |
except subprocess.CalledProcessError as e: | |
out_bytes = e.output # Output generated before error | |
code = e.returncode # Return code | |
print(e) | |
return err_unknown | |
def work(pid, mode): | |
global job | |
try: | |
out_bytes = subprocess.check_output(['bjobs'], stderr=subprocess.STDOUT, timeout=10) | |
out_text = out_bytes.decode('utf-8').split('\n')[1:] | |
got = False | |
if not out_text: | |
print("no jobs running") | |
return err_no_jobs | |
for line in out_text: | |
items = line.strip().split() | |
if not items: | |
continue | |
r_pid = int(items[0]) | |
job_status = items[2] | |
if r_pid == pid: | |
got = True | |
job_queue = items[4] | |
job_name = items[-1] | |
if not job: | |
job = JOB() | |
job.name = job_name | |
job.pid = pid | |
job.queue = job_queue | |
print(job) | |
if mode == START_MODE and job_status == 'RUN': | |
message = "summit job starts: \njob_id queue job_name\n%d %s %s" % ( | |
pid, job.queue, job.name) | |
send_notification(message) | |
return success | |
if not got: | |
if mode == START_MODE: | |
if job.status == STATUS_PENDING: | |
message = "summit job is killed: \njob_id queue job_name\n%d %s %s" % ( | |
pid, job.queue, job.name) | |
send_notification(message) | |
else: | |
print("didn't find target process") | |
return err_no_such_process | |
elif not job: | |
print("no such job is running") | |
return err_no_such_process | |
else: | |
message = "summit job finished: \njob_id queue job_name\n%d %s %s" % ( | |
pid, job.queue, job.name) | |
send_notification(message) | |
return success | |
return None | |
except subprocess.CalledProcessError as e: | |
out_bytes = e.output # Output generated before error | |
code = e.returncode # Return code | |
print(e) | |
return err_unknown | |
def call_work(pid, amode): | |
while True: | |
return_code = work(pid, amode) | |
if return_code in err_list: | |
print(return_code) | |
exit(-1) | |
elif return_code == success: | |
break | |
time.sleep(10) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-i', '--id', metavar='The process id', | |
required=False, dest='pid', action='store') | |
parser.add_argument('-m', '--mode', | |
metavar='s|start: when the status of target process becomes running, it sends notification. It is default mode. f|finish: when the target process finished, it sends notification. full: start + finish', | |
required=True, dest='mode', action='store') | |
args = parser.parse_args() | |
if not args.pid: | |
if get_job() in err_list: | |
exit(-1) | |
pid = job.pid | |
else: | |
pid = int(args.pid) | |
mode = None | |
if args.mode: | |
if args.mode in ['s', 'start']: | |
mode = START_MODE | |
elif args.mode in ['f', 'finish']: | |
mode = FINISH_MODE | |
elif args.mode == 'full': | |
mode = FULL_MODE | |
else: | |
print("wrong argument: mode", args.mode) | |
exit(-1) | |
else: | |
mode = START_MODE | |
if mode == FULL_MODE: | |
call_work(pid, START_MODE) | |
call_work(pid, FINISH_MODE) | |
else: | |
call_work(pid, mode) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment