Skip to content

Instantly share code, notes, and snippets.

@phistep
Created July 23, 2020 11:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phistep/a7e96acdb00c7bb149395f52c1c7b0e3 to your computer and use it in GitHub Desktop.
Save phistep/a7e96acdb00c7bb149395f52c1c7b0e3 to your computer and use it in GitHub Desktop.
Minimal Paramter Varying Batch Job Submission System
#!/usr/bin/python3
import argparse
import itertools as it
import os
import subprocess as sp
import multiprocessing as mp
parser = argparse.ArgumentParser(description="Run script for all combinations of arguments in parallel")
parser.add_argument('config', type=str, help="config file")
parser.add_argument('-n', '--processes', type=int, default=None, help="number of parallel processes; defaults to number of cores")
parser.add_argument('-N', '--nice', type=int, default=10, help="niceness of processes")
parser.add_argument('-p', '--push-notify', action='store_true',
help="use pushover to send notification when job finishes; auth is read from ~/.config/pushover, APP_TOKEN\nUSER_TOKEN")
parser.add_argument('-m', '--monitor', action='store_true', help="show progress of jobs (linecounts and cpu load) in a tmux overview")
args = parser.parse_args()
""" Config file example
# name of the batch job
name = 'example'
# tuple of strings that make up the command to be exectued.
# if its standalone program: ('programname',)
# if its an interoreter of sorts: ('bash', 'script.sh')
command = ('python3', '/home/phistep/University/Master/vaml/vamp/weight_distance.py')
# folder, file template including extension
# file template subs in parameter values (use "b={-b}" to render '-b 0.5' as "-b=0.5")
output = ('./batch-test', 'c={-c},d={-d}.tsv')
# base parameters for all runs
# dictionary with 'switch': value
base = {
'-a': 1000,
# just use switch without value
'-B': None,
'-c': 0,
'-d': 'auto'
}
# dictionary with 'switch': (tuple, of, possible, values)
# all combinations will be explored
varying = {
'-d': ('d1', 'd2'),
'-e': ('e1', 'e2', 'e3'),
# use one switch or the orther or none, can only be done for one set of
# mutually exclusive switches and blanks. These cannot be used in file name
# templating
None: ('-F', None),
}
# for when you need to vary parameters together
# this will give [{'-g':1, '-h':10}, {'-g':0.1, '-h':100}, {'-g':0.01, '-h':1000},]
# and will be combined in all combinations with the combinations from `varying` above
varying_combined = [
{
'-g': (1, 0.1, 0.001,),
'-h': (10, 100, 1000,),
},
]
# these will override any previous base options and will run as independed
# sets
extra = [
{'-c': 0.5, '-g': '0.5,0.5'},
{'-c': 0.5, '-g': '0.9,0.1'},
]
"""
exec(open(args.config).read())
if args.monitor:
from sys import exit
from glob import glob
session_name = f'job_monitor_{name}'
load_cmd = "htop"
progress_cmd = f"watch \"find {output[0]} -name \*{os.path.splitext(output[1])[-1]} -exec wc -l {{}} \;\""
inspect_cmd = "less +F {}"
try:
tmux = sp.Popen(f'tmux attach-session -t {session_name}', shell=True, stderr=sp.DEVNULL)
tmux.wait()
if tmux.returncode:
sp.run(f"tmux new-session -s {session_name} -d '{progress_cmd}'", shell=True, check=True)
sp.run(f"tmux split-window -t {session_name} -h '{load_cmd}'", shell=True, check=True)
for file_ in glob(os.path.join(output[0], '*'+os.path.splitext(output[1])[-1])):
sp.run(f"tmux new-window -t {session_name} -n '{os.path.basename(file_)}' '{inspect_cmd.format(file_)}'", shell=True, check=True)
sp.run(f'tmux select-window -t 0', shell=True, check=True)
sp.run(f'tmux attach-session -t {session_name}', shell=True, check=True)
finally:
sp.run(f'tmux kill-session -t {session_name}', shell=True, stderr=sp.DEVNULL)
exit()
try: base
except NameError: base = {}
try: varying
except NameError: varying = {}
try: varying_combined
except NameError: varying_combined = {}
try: extra
except NameError: extra = {}
varied_combined = [
[{k: v for (k, v) in zip(varying_combined_set.keys(), values)}
for values in zip(*varying_combined_set.values())]
for varying_combined_set in varying_combined]
varied_combind_sets = [{k:v for kv in tup for k, v in kv.items()} for tup in it.product(*varied_combined)]
varied_param_sets = [{k: v for (k, v) in zip(varying.keys(), values)} for values in it.product(*varying.values())]
varied_varied_sets = [{k: v for kv in tup for k, v in kv.items()} for tup in it.product(varied_combind_sets, varied_param_sets)]
param_sets = [{**base, **varied_params} for varied_params in varied_varied_sets]
param_sets.extend([{**base, **extra_params} for extra_params in extra])
jobs = [{
'id': id_,
'args': [*command, *[str(item) for kv in param_set.items() for item in kv if item is not None]],
'stdout': os.path.join(output[0], f'{id_:04d}_'+output[1].format(**param_set)),
} for id_, param_set in enumerate(param_sets,1)]
num_jobs = len(jobs)
def run_job(job):
kwargs = {k:job[k] for k in job if k!='id'}
job['prog_name'] = os.path.basename(command[-1])
job['short_args'] = ' '.join(job['args'][len(command):])
print("[{name}#{id:04d}/{num_jobs} RUN ]: {prog_name} {short_args}".format(**job, name=name, num_jobs=num_jobs))
kwargs['args'] = ' '.join(['nice', '-n', str(args.nice), *kwargs['args']])
kwargs['shell'] = True
with open(kwargs['stdout'], 'w') as kwargs['stdout']:
sp.run(**kwargs)
job_finished(job)
def job_finished(job):
print("[{name}#{id:04d}/{num_jobs} DONE]: {prog_name} {short_args}".format(**job, name=name, num_jobs=num_jobs))
if args.push_notify:
import http.client, urllib
title = "Job {name}#{id:04d}/{num_jobs} done.".format(**job, name=name, num_jobs=num_jobs)
message = "{prog_name} {short_args}".format(**job)
auth = open(os.path.expanduser('~/.config/pushover')).read().split('\n')
conn = http.client.HTTPSConnection("api.pushover.net:443")
conn.request("POST", "/1/messages.json",
urllib.parse.urlencode({
"token": auth[0],
"user": auth[1],
"title": title,
"message": message,
}), { "Content-type": "application/x-www-form-urlencoded" })
conn.getresponse()
if not args.processes:
args.processes = mp.cpu_count()
with mp.Pool(args.processes) as worker:
worker.map(run_job, jobs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment