Skip to content

Instantly share code, notes, and snippets.

@austinschneider
Last active May 27, 2022 20:04
Show Gist options
  • Save austinschneider/a0fab0983950981fe7bb22b6375b4bc7 to your computer and use it in GitHub Desktop.
Save austinschneider/a0fab0983950981fe7bb22b6375b4bc7 to your computer and use it in GitHub Desktop.
Slurm submission script for systems that limit the number of queued jobs
import glob
import time
import numpy as np
import subprocess
def slurm_get_n_jobs(user):
command = f"squeue --user {user} --array --format=%all"
result = subprocess.run(command.split(" "), stdout=subprocess.PIPE)
lines = result.stdout.split(b"\n")
header, lines = lines[0], [str(s) for s in lines[1:]]
lines = [s for s in lines if "|" in s]
n_jobs = len(lines)
return n_jobs
def slurm_get_job_ids(user, job_name):
command = f"squeue --user {user} --array --format=%all"
result = subprocess.run(command.split(" "), stdout=subprocess.PIPE)
lines = result.stdout.split(b"\n")
header, lines = str(lines[0]), [str(s) for s in lines[1:]]
column_names = header.split("|")
lines = [s for s in lines if "|" in s]
if len(lines) == 0:
return []
job_info = list(zip(*[s.split("|") for s in lines]))
job_name_index = column_names.index("NAME")
job_id_index = column_names.index("JOBID")
job_names = np.array(job_info[job_name_index])
job_ids = np.array(job_info[job_id_index])
job_name_mask = job_names == job_name
job_ids = [int(s.split("_")[1]) for s in job_ids[job_name_mask] if "_" in s]
return job_ids
def filesystem_get_jobs_done(path):
fnames = glob.glob(f"{path}/*.json")
get_job_id = lambda s: int(s.split(".json")[0].split("/")[-1])
jobs_done = set([get_job_id(fname) for fname in fnames])
return jobs_done
def filesystem_get_jobs_not_done(path, max_job_id):
jobs_done = filesystem_get_jobs_done(path)
jobs_not_done = [i for i in range(max_job_id) if i not in jobs_done]
return jobs_not_done
def get_job_ranges(jobs):
strides = []
stride = None
for i in sorted(jobs):
if stride is None:
stride = [i, None]
elif stride[1] is None and stride[0] == i - 1:
stride = [stride[0], i]
elif stride[1] is None and stride[0] < i - 1:
strides.append(stride)
stride = [i, None]
elif stride[1] == i - 1:
stride = [stride[0], i]
elif stride[1] < i - 1:
strides.append(stride)
stride = [i, None]
else:
print(i, stride)
raise ValueError("Bad stride")
if stride is not None:
strides.append(stride)
return [
str(stride[0]) if stride[1] is None else (str(stride[0]) + "-" + str(stride[1]))
for stride in strides
]
def submission_loop(
user, job_name, total_jobs, job_executable, job_output_path, queue_max_jobs=500
):
max_job_id = total_jobs - 1
first = True
while True:
if not first:
time.sleep(60)
else:
first = False
n_jobs_in_queue = slurm_get_n_jobs(user)
n_jobs_to_start = queue_max_jobs - n_jobs_in_queue
if n_jobs_to_start <= 0:
print("Max jobs queued")
continue
print(
f"Have {n_jobs_in_queue} jobs in queue; can submit up to {n_jobs_to_start} new jobs"
)
jobs_queued = set(slurm_get_job_ids(user, job_name))
time.sleep(10)
jobs_not_done = set(filesystem_get_jobs_not_done(job_output_path, max_job_id))
jobs_to_do = jobs_not_done - jobs_queued
jobs_to_do = sorted(jobs_to_do)
jobs_to_do = jobs_to_do[:n_jobs_to_start]
n_jobs_to_do = len(jobs_to_do)
if n_jobs_to_do > 0:
print("Submitting {n_jobs_to_do} new jobs")
elif n_jobs_in_queue > 0:
print("No jobs to submit; waiting for jobs to finish")
continue
else:
print("Nothing left to do; quitting now")
break
job_ranges = get_job_ranges(jobs_to_do)
job_array = ",".join(job_ranges)
submit_command = f"sbatch --array={job_array} {job_executable}"
print(submit_command)
result = subprocess.run(submit_command.split(" "), stdout=subprocess.PIPE)
print(str(result.stdout))
user = "SlurmUserName"
job_name = "SlurmJobName"
total_jobs = 2000
job_executable = "/home/SlurmUserName/my_job.sh"
job_output_path = "/home/SlurmUserName/my_output/"
queue_max_jobs = 500
submission_loop(
user,
job_name,
total_jobs,
job_executable,
job_output_path,
queue_max_jobs=queue_max_jobs,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment