Skip to content

Instantly share code, notes, and snippets.

@theideasmith
Last active July 13, 2017 15:18
Show Gist options
  • Save theideasmith/bc9d93db195085176472df81c27d4e9f to your computer and use it in GitHub Desktop.
Save theideasmith/bc9d93db195085176472df81c27d4e9f to your computer and use it in GitHub Desktop.
Columbia Cluster Submit Library

Cluster Submit Kit

Download this repo and add submitgen.py to your path in the project you are using. Then type python submitgen.py help for instructions on how to use the submit script.

Once you use submitgen.py to generate a submit script, use the proper bash command on the cluster you are using to submit it. If you are on yeti, then run sbatch submitscript.sh and if on habanero then qsub submitscript.sh.

You can add things to your path by appending export PATH="PATH:new/path to the bottom of your .bashrc file and then running source ~/.bashrc to reload your bashrc and add to path

import os
from mkdirp import mkdir_p
class FileGenerator(object):
def __init__(self, fname=""):
self.lines = []
self.suffix = ""
self.fname = fname
def add_line(self, *args):
line = args
line = map(str, line)
self.lines.append(' '.join(line))
return self
def suffix(self, suffix):
self.suffix = suffix
def sufix(self, sufix):
self.sufix = sufix
def joinlines(self):
return '\n'.join(self.lines)
def write(self, fname=None):
if not fname:
fname = self.fname
if not os.path.isdir(os.path.dirname(fname)):
mkdir_p(os.path.dirname(fname))
#map(p, self.lines)
with open(fname, 'w') as f:
string = self.joinlines()
f.write(string)
return self
import inspect
def helpcomponents_f(f):
argnames, varargs, kwargnames, defaults = inspect.getargspec(f)
if defaults:
help_args = argnames[:-len(defaults)]
elif argnames:
help_args = argnames
else:
help_args = []
if varargs:
help_args.append('<arg>, ...')
if defaults and argnames:
help_kwargs = zip(argnames[-len(defaults):],defaults)
else:
help_kwargs = []
if kwargnames:
help_kwargs.append(('<kwarg>','<val>,...'))
return help_args, help_kwargs
def helpstr(mainf, *fs):
help_args, help_kwargs = helpcomponents_f(mainf)
for f in fs:
hargs, hkwargs = helpcomponents_f(f)
help_args.extend(hargs)
help_kwargs.extend(hkwargs)
return help_args, help_kwargs
import errno
import os
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
import re
import sys
import inspect
from functools import wraps
import helpstr
#Matches dictionary definitions of integer or string values
# Match key String Values Numerical Values Exponents
# kwargs_words = "([A-z]+[A-z0-9_]*)\ *\=(?:\"|\')(.+)(?:\"|\')"
# PLEASE READ BELOW
# It took bloodshed to carve these regexes out of characterspace^(len_regex) possibilities
kwargs_words = "([A-z]+[A-z0-9_]*)\ *\=(.+)"
kwargs_numbers = "([A-z]+[A-z0-9_]*)\ *\=(\-*[0-9]*\.*[0-9]+(?:e\-*\d+)*)"
re_kwargs_words = re.compile(kwargs_words)
re_kwargs_numbers = re.compile(kwargs_numbers)
args= "(\-*[0-9]*\.*[0-9]+)(e\-*\d+)|(.+)"
re_args = re.compile(args)
def numread(string):
try:
return int(string)
except ValueError:
try:
return float(string)
except ValueError:
return string
def boolread(string):
if string=="False":
return False
if string=="None":
return None
if string=="True":
return True
return string
def matchany_or_last(regex, string):
match_first = regex +"\ *\,\ *"
match_any = regex
match_last = "\ *\,_\ *" + regex
re_match_first = re.compile(match_first)
re_match_any = re.compile(match_any)
re_match_last = re.compile(match_last)
firsts = re_match_first.findall(string)
string = re_match_first.sub('', string)
anys = re_match_any.findall(string)
string = re_match_any.sub('', string)
lasts = re_match_last.findall(string)
string = re_match_last.sub('', string)
total = []
total.extend(firsts)
total.extend(anys)
total.extend(lasts)
return total, string
def _collectkwargs(argv):
words, string = matchany_or_last(kwargs_words, argv)
numbers, _ = matchany_or_last(kwargs_numbers, string)
total = []
total.extend(words)
total.extend(numbers)
ret = dict(total)
for k in ret.keys():
ret[k] = numread(ret[k])
ret[k] = boolread(ret[k])
return ret, string
def collectkwargs(argv):
m, _ = _collectkwargs(argv)
return m
def collectargs(argv):
total = re_args.findall(argv)
total = map(lambda x: ''.join(list(x)), total)
total = map(lambda x: numread(x), total)
total = map(lambda x: boolread(x), total)
return total
def parseargs(argc):
kwargs, string = _collectkwargs(argc)
args = collectargs(string)
return kwargs, args
class shify:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __call__(self, func):
name = self.kwargs.pop("name", "__imported__")
cli = self.kwargs.pop("cli", "never")
tocontinue = name =="__main__" or cli=="allways"
if not tocontinue:
return func
_shify(func, *self.args, **self.kwargs)
def _shify(f,
string='',
name="__imported__",
usage=None,
includes=[],
_debug=False, **kwargs):
if string=='' and len(sys.argv) >= 1:
string = sys.argv[1:]
elif string != '' and isinstance(string, str):
string = [string]
return Exception("No string passed")
"""
This is a micro argparse for turning any
python function into a bash script.
For scientists who are busy and need
to throw something together quickly
Handles strings and integers
as kwargs and args
"""
results=map(parseargs, string)
kwargs = {}
args = []
visited = set()
for kwarg, arg in results:
for k, v in kwarg.iteritems():
if k in kwargs and not k in visited:
kwargs[k] = [kwargs[k], v]
visited.add(k)
elif k in kwargs and k in visited:
kwargs[k] += [v]
elif not k in kwargs:
kwargs[k] = v
args.extend(arg)
if "help" in args or "help" in kwargs:
if includes:
help_args, help_kwargs = helpstr.helpstr(f, *includes)
else:
help_args, help_kwargs = helpstr.helpstr(f)
pre = " "
print "usage: {}".format(f.__name__)
print pre+"[help]"
for arg in help_args:
print pre+"[{}]".format(arg)
for k,v in help_kwargs:
print pre+"[{}={}]".format(k,v)
if "help" in args:
args.pop(args.index("help"))
else:
del kwargs["help"]
if usage: #(I say let it fail:) and isinstance(help_kwargs["func_usage"], str):
print pre+usage
print pre +"-"*len(usage.split('\n')[0])
return
args = list(args)
if _debug:
print args
print kwargs
return f(*args, **kwargs)
def demo_2(g=6, h=7):
print g,h
@shify(name=__name__,
includes=[demo_2],
usage="A py2sh demo")
def py2sh_demo(a, b, c=1, d= 2,*args, **kwargs):
print args
print kwargs
from submitkit import *
from py2sh import shify
@shify(name=__name__,
usage="""
Generate submit scripts for models
Options:
+-----------------------------------------------------------+
| bash | a list of bash commands to run, |
| | bash="command1" bash="command2" ... |
+-----------------------------------------------------------+
| mem | how much memory to use |
+-----------------------------------------------------------+
| walltime | how long job should run for. make sure to |
| | use the right format for the cluster you |
| | are on |
+-----------------------------------------------------------+
| nodes | how many nodes to use. make sure it is |
| | within cluster limits |
+-----------------------------------------------------------+
| processors | processors per node. standard warnings apply|
+-----------------------------------------------------------+
| array | how many array jobs to run |
+-----------------------------------------------------------+
| submitscript| the name of the cluster submit script |
| | you want to generate |
+-----------------------------------------------------------+
| jobname | the name of the job to be submitted |
+-----------------------------------------------------------+
| outdir | where job log files should be written to. |
| | the outfiles will be name <jobname>.out |
+-----------------------------------------------------------+
| email | the email to send job notifications to |
+-----------------------------------------------------------+
| location | the name of the cluster currently running |
| | currently supports {yeti, habanero, native} |
| | if native, submit script will run like reg |
| | bash script |
+-----------------------------------------------------------+
""")
def trainkit_submit(
bash=[],
mem="64000mb",
walltime="1-0",
nodes=1,
processors=8,
array=1,
submitscript="",
jobname="",
outdir=None,
location="yeti",
email="acl2205@columbia.edu",
**kwargs):
if not isinstance(bash,list):
bash = [bash]
# ---------------( Initialization )-----------
submitscript = os.path.abspath(submitscript)
job_outdir = os.path.abspath(outdir)
mkdir_p(job_outdir)
fname = submitscript
#------------( Computing Location )------------------
if location == "yeti":
submission = YetiSubmit(submitscript)
groupname="yetistats"
elif location=="habanero":
submission = HabaneroSubmit(submitscript)
groupname="stats"
elif location=="native":
groupname="lipshitz"
submission = ClusterSubmit(submitscript)
# -----------( Generating Submit Script )------------
submission\
.bash("#!/bin/sh")\
.bash("#Config")\
.email(email)\
.jobname(jobname)\
.hardware(nodes=nodes, processors=processors)\
.time(walltime)\
.groupname(groupname)\
.mem(mem)\
isarr=False
if array > 1:
isarr=True
submission.array(array)
submission\
.out(
job_outdir,
isarray=isarr,
modelname=jobname)\
.envexport()
map(submission.bash, bash)
submission\
.bash("#Running")\
.write()
from filegenerators import *
def p(*args):
print args
class ClusterSubmit(FileGenerator):
def __init__(self, *args,**kwargs):
suffix_map={
"std_sufix": "",
"jobname": "",
"groupname": "",
"mem": "",
"outputdir": "",
"array": "",
"time": "",
"nodes": "",
"processors": "",
"email":"",
"outmap":{True:lambda a,m: "", False:lambda a,m:""}
}
self.sufmap = suffix_map
super(ClusterSubmit, self).__init__(*args, **kwargs)
def initline(self, *args):
args = list(args)
self.add_line(*([self.sufmap["std_sufix"]]+args))
return self
def array(self, narray):
narray = str(narray)
self.initline(
self.sufmap["array"]+narray)
def jobname(self, name):
name = str(name)
self.initline(
self.sufmap["jobname"]+name)
return self
def groupname(self, name):
name = str(name)
self.initline(
self.sufmap["groupname"]+name)
return self
def time(self, time):
time = str(time)
self.initline(
self.sufmap["time"]+time)
return self
def mem(self, mem):
mem = str(mem)
self.initline(
self.sufmap["mem"]+mem)
return self
def out(self, to, isarray=False, modelname=""):
suf = self.sufmap["outmap"][isarray](to, modelname)
self.initline(self.sufmap["outputdir"]+suf)
return self
def hardware(self, nodes=1, processors=1):
nodes = str(nodes)
processors = str(processors)
self.initline(
self.sufmap["nodes"]+nodes)
self.initline(
self.sufmap["processors"]+processors)
return self
def bash(self, *bashline, **kwargs):
iff = kwargs.pop('iff', None)
if iff==True or iff is None:
self.add_line(*bashline)
elif iff==False:
self.add_line("# ",*bashline)
return self
def email(self, email):
self.initline(
self.sufmap["email"]+email)
return self
class YetiSubmit(ClusterSubmit):
def __init__(self, *args, **kwargs):
super(YetiSubmit, self).__init__(*args, **kwargs)
self.sufmap.update({
"std_sufix": "#PBS",
"jobname": "-N=",
"groupname": "-W group_list=",
"mem": "-l mem=",
"outputdir": "-o localhost:",
"array": "-t 1-",
"time": "-l walltime=",
"nodes": "-l nodes=",
"processors": "ppn=",
"email":"-M"
# dont need to update outmap for yeti
})
self.envexport()
def email_codes(self, abort=False, begin=False, end=False):
mapping = {
"abort": "a",
"begin": "b",
"end": "e"
}
abe_string = ""
if abort: abe_string += mapping["abort"]
if begin: abe_string += mapping["begin"]
if end: abe_string += mapping["end"]
self.add_line("#PBS -m {}".format(abe_string))
return self
def envexport(self):
self.add_line("#PBS -V")
return self
def out(self, stdout_file, **kwargs):
self.add_line("#PBS -o localhost:{}".format(stdout_file))
self.add_line("#PBS -e localhost:{}".format(stdout_file))
return self
class HabaneroSubmit(ClusterSubmit):
def __init__(self, *args, **kwargs):
super(HabaneroSubmit, self).__init__(*args, **kwargs)
self.sufmap ={
"std_sufix":"#SBATCH",
"time":"--time=",
"nodes":"--nodes=",
"processors":"--ntasks-per-node=",
"mem":"--mem=",
"groupname":"--account=",
"outputdir":"--output=",
"email":"--mail-user=",
"jobname":"--job-name=",
"array":"--array=1-",
"outmap":{True: lambda a,m: os.path.join(a, m+"-%a.out"), False:lambda a,m: os.path.join(a, m+"-%j.out")}
}
def envexport(self):
self.initline("--export=ALL")
return self
def email(self, email):
self.initline(
self.sufmap["email"]+email)
self.initline("--mail-type=END")
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment