Last active
May 27, 2022 20:04
-
-
Save austinschneider/a0fab0983950981fe7bb22b6375b4bc7 to your computer and use it in GitHub Desktop.
Slurm submission script for systems that limit the number of queued jobs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import glob | |
import time | |
import numpy as np | |
import subprocess | |
def slurm_get_n_jobs(user): | |
command = f"squeue --user {user} --array --format=%all" | |
result = subprocess.run(command.split(" "), stdout=subprocess.PIPE) | |
lines = result.stdout.split(b"\n") | |
header, lines = lines[0], [str(s) for s in lines[1:]] | |
lines = [s for s in lines if "|" in s] | |
n_jobs = len(lines) | |
return n_jobs | |
def slurm_get_job_ids(user, job_name): | |
command = f"squeue --user {user} --array --format=%all" | |
result = subprocess.run(command.split(" "), stdout=subprocess.PIPE) | |
lines = result.stdout.split(b"\n") | |
header, lines = str(lines[0]), [str(s) for s in lines[1:]] | |
column_names = header.split("|") | |
lines = [s for s in lines if "|" in s] | |
if len(lines) == 0: | |
return [] | |
job_info = list(zip(*[s.split("|") for s in lines])) | |
job_name_index = column_names.index("NAME") | |
job_id_index = column_names.index("JOBID") | |
job_names = np.array(job_info[job_name_index]) | |
job_ids = np.array(job_info[job_id_index]) | |
job_name_mask = job_names == job_name | |
job_ids = [int(s.split("_")[1]) for s in job_ids[job_name_mask] if "_" in s] | |
return job_ids | |
def filesystem_get_jobs_done(path): | |
fnames = glob.glob(f"{path}/*.json") | |
get_job_id = lambda s: int(s.split(".json")[0].split("/")[-1]) | |
jobs_done = set([get_job_id(fname) for fname in fnames]) | |
return jobs_done | |
def filesystem_get_jobs_not_done(path, max_job_id): | |
jobs_done = filesystem_get_jobs_done(path) | |
jobs_not_done = [i for i in range(max_job_id) if i not in jobs_done] | |
return jobs_not_done | |
def get_job_ranges(jobs): | |
strides = [] | |
stride = None | |
for i in sorted(jobs): | |
if stride is None: | |
stride = [i, None] | |
elif stride[1] is None and stride[0] == i - 1: | |
stride = [stride[0], i] | |
elif stride[1] is None and stride[0] < i - 1: | |
strides.append(stride) | |
stride = [i, None] | |
elif stride[1] == i - 1: | |
stride = [stride[0], i] | |
elif stride[1] < i - 1: | |
strides.append(stride) | |
stride = [i, None] | |
else: | |
print(i, stride) | |
raise ValueError("Bad stride") | |
if stride is not None: | |
strides.append(stride) | |
return [ | |
str(stride[0]) if stride[1] is None else (str(stride[0]) + "-" + str(stride[1])) | |
for stride in strides | |
] | |
def submission_loop( | |
user, job_name, total_jobs, job_executable, job_output_path, queue_max_jobs=500 | |
): | |
max_job_id = total_jobs - 1 | |
first = True | |
while True: | |
if not first: | |
time.sleep(60) | |
else: | |
first = False | |
n_jobs_in_queue = slurm_get_n_jobs(user) | |
n_jobs_to_start = queue_max_jobs - n_jobs_in_queue | |
if n_jobs_to_start <= 0: | |
print("Max jobs queued") | |
continue | |
print( | |
f"Have {n_jobs_in_queue} jobs in queue; can submit up to {n_jobs_to_start} new jobs" | |
) | |
jobs_queued = set(slurm_get_job_ids(user, job_name)) | |
time.sleep(10) | |
jobs_not_done = set(filesystem_get_jobs_not_done(job_output_path, max_job_id)) | |
jobs_to_do = jobs_not_done - jobs_queued | |
jobs_to_do = sorted(jobs_to_do) | |
jobs_to_do = jobs_to_do[:n_jobs_to_start] | |
n_jobs_to_do = len(jobs_to_do) | |
if n_jobs_to_do > 0: | |
print("Submitting {n_jobs_to_do} new jobs") | |
elif n_jobs_in_queue > 0: | |
print("No jobs to submit; waiting for jobs to finish") | |
continue | |
else: | |
print("Nothing left to do; quitting now") | |
break | |
job_ranges = get_job_ranges(jobs_to_do) | |
job_array = ",".join(job_ranges) | |
submit_command = f"sbatch --array={job_array} {job_executable}" | |
print(submit_command) | |
result = subprocess.run(submit_command.split(" "), stdout=subprocess.PIPE) | |
print(str(result.stdout)) | |
user = "SlurmUserName" | |
job_name = "SlurmJobName" | |
total_jobs = 2000 | |
job_executable = "/home/SlurmUserName/my_job.sh" | |
job_output_path = "/home/SlurmUserName/my_output/" | |
queue_max_jobs = 500 | |
submission_loop( | |
user, | |
job_name, | |
total_jobs, | |
job_executable, | |
job_output_path, | |
queue_max_jobs=queue_max_jobs, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment