Skip to content

Instantly share code, notes, and snippets.

@wasade
Created February 19, 2013 20:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wasade/4989820 to your computer and use it in GitHub Desktop.
Save wasade/4989820 to your computer and use it in GitHub Desktop.
Updated cluster_jobs.py for use on some Torque based clusters
#!/bin/env python
#cluster_jobs.py
"""
Simple helper script to make pbs jobs and submit to the cluster
Will only work if run on bmf or hemisphere
Can run from command line or import into python code.
Author: Micah Hamady
Status: Prototype
"""
from os import makedirs, removedirs
from commands import getoutput
from string import strip
from time import sleep
from random import choice
__author__ = "Micah Hamady"
__copyright__ = "Copyright 2012, PyCogent"
__credits__ = ["Micah Hamady", "Daniel McDonald"]
__license__ = "GPL"
__version__ = "1.5.3-dev"
__maintainer__ = "Daniel McDonald"
__email__ = "mcdonadt@colorado.edu"
__status__ = "Development"
# module constants
# qsub template
QSUB_TEXT = """# Walltime Limit: hh:nn:ss
#PBS -l walltime=%s
# Node Specification: Two CPUs on one node
#PBS -l ncpus=%d -l nodes=%d%s -l pvmem=%s
# Mail: options are (a) aborted, (b) begins execution, (e) ends execution
# use -M <email> for additional recipients
# Queue: Defaults to friendlyq
#PBS -q %s
# supress email notification
#PBS -m n
# Job Name:
#PBS -N %s
#PBS -k %s
# discard output
%s
######################################################################
# Executable commands go here. Make sure that mpirun -np and -npn match the
# number of processors you requested from PBS! If you are using ppn and want
# npn=2, then you can leave -np and -npn off and PBS will add it for you. If
# you want npn=1, then you have to request all of the processors on all of the
# nodes, and then force it to use less using -npn.
echo ------------------------------------------------------
echo PBS: qsub is running on $PBS_O_HOST
echo PBS: originating queue is $PBS_O_QUEUE
echo PBS: executing queue is $PBS_QUEUE
echo PBS: working directory is $PBS_O_WORKDIR
echo PBS: execution mode is $PBS_ENVIRONMENT
echo PBS: job identifier is $PBS_JOBID
echo PBS: job name is $PBS_JOBNAME
echo PBS: node file is $PBS_NODEFILE
echo PBS: current home directory is $PBS_O_HOME
echo PBS: PATH = $PBS_O_PATH
echo ------------------------------------------------------
cd $PBS_O_WORKDIR
%s
"""
# qsub command
QSUB_CMD = "qsub %s"
#QSUB_CMD = "/usr/pbs/bin/qsub %s"
# command line usage string
USAGE_STR = """Usage: cluster_jobs.py <-m|-ms|-clean|-kill> [<name of commands file> <jobs prefix>]
Options:
-m just make jobs. this will create a "jobs" directory in the current
directory and write out the pbs job files there.
-ms make and submit jobs.
-clean clean up temp and output files (std and stderrror) from previous
run(s).
-cleansave clean up temp and output files (std and stderrror) from previous
run(s) and move to saved directory
-kill kill all of queued or running jobs for current user.
The following are required if -m or -ms specified (2nd and 3rd arguments):
<name of commands file> name of file with one command per line.
<jobs_prefix> prefix of jobs, 10 chars. e.g. micah_blat
"""
class ClusterJobs:
"""
Wrapper to queue managament system.
Creates and/or sumbits jobs to cluster.
"""
# bmf typical settings
def __init__(self, commands_list, jobs_prefix, jobs_dir="jobs",
ppn=1, nnodes=1, ncpus=1, num_jobs=50, wall_time="999:00:00",
queue_name="memroute", job_range=None,
use_mpirun = False, qstat_prefix = "compy", delay_secs=5.0,
verbose=True, keep_output="oe", discard_output=False,
memory='4gb'):
# hemisphere typical settings
#def __init__(self, commands_list, jobs_prefix, jobs_dir="jobs",
# ppn=1, nnodes=1, ncpus=1, num_jobs=48, wall_time="144:00:00",
# queue_name="friendlyq", job_range=(16,32),
# use_mpirun = False, qstat_prefix = "hemispher",
# delay_secs=1.0, verbose=False, keep_output="oe", discard_output=False):
"""
Initialize jobs
commands_list: list of commands to run on cluster
**PLEASE NOTE** path to executable must be fully qualified.
e.g. to run a python script, the job must look something like
/usr/local/bin/python2.4 my_script.py 'script params'
**ALSO NOTE** If your python (et al) script takes parameters from
the command line, and you are checking the number of parameters
you should be aware that mpirun will add parameters after the
parameters you have passed.
(e.g. if len(argv) != 4: <do something>, you will need to modify
your code)
jobs_prefix: job prefix. must be <= 10 characters. no spaces. first
character must be a-Z. this is what you will see in the job queue,
and will be the prifix of all of the standard out and standard
error output. required.
jobs_dir: path to directory to store generated jobs. default = "jobs"
ppn: number of processors per node to use, default = 1
nnodes: number of nodes per job, default = 1
ncpus: number of cpus total to use
num_jobs: number of jobs to make, default = 16
wall_time: maximum time the job is allowed to run. friendlyq limit is
72 hours. if debugging a program that might hang (e.g. an mpi
program that blocks) it is a good idea to set this value very low,
e.g. 5 minutes. format is HH:MM:SS. default = 12 hours
queue_name: name of queue to submit jobs to. default = friendlyq
job_range: only create/submit jobs in this range. tuple (start, stop)
half open. e.g. includes first index, excludes last)
use_mpirun: will run jobs using mpi run. this will send your command
through mpirun shell script. both mpi and non-mpi programs will
run fine when this is True. default = True
qstat_prefix: prefix of the qstat host. e.g. execute qstat and you
will see something like 7737.bmf or 204544.hemispher
the qstat_prefix would be the text after the number, so
on bmf, the qstat_prefix is "bmf"
on hemisphere the qstat_prefix is currently "hemispher"
delay_secs: time delay between submitting jobs - helps to avoid
pbs crashes
keep_output: keep standard error, standard out, both, or neither
o=std out, e=std err, oe=both, n=neither
verbose: when True, will print out progress, otherwise will be silent
discard_output: when True, output will write to temp file. Using
when running as apache user (word dir env vars stripped) and
-k n is ignored. If not used, there is a delay when job
finishes running but output cannot be delivered.
memory: memory to request
"""
if not commands_list:
raise ValueError, "You must specify a list of commands."
if not jobs_prefix or len(jobs_prefix) > 10:
raise ValueError, "Jobs prefix must be a 1-10 characters long."
if not jobs_prefix[0].isalpha():
raise ValueError, "First letter of prefix must be a-Z."
if ppn < 1 or ppn > 2:
print "WARNING! Possible invalid ppn value. %d" % ppn
if nnodes < 1 or nnodes > 8:
print "WARNING! Possible invalid number of nodes. %d" % nnodes
if num_jobs < 1:
raise ValueError, "You must specify > 1 jobs to create."
if ncpus < 1:
raise ValueError, "You must specify > 1 cpus to use"
if num_jobs > 256:
print "WARNING! Large number of jobs specified! %d" % num_jobs
# make sure that absolute path to executable is specified
#if len(filter(lambda x: x.startswith("/"), commands_list)) != \
# len(commands_list):
# raise ValueError, "One or more commands not fully qualified."
# try to make directory for jobs
try:
makedirs(jobs_dir)
except OSError:
# if directory already exists, ignore
pass
if keep_output not in ["e", "o", "oe", "eo", "n"]:
raise ValueError, "Invalid keep_output option."
# make sure jobs dir has training slash
if not jobs_dir.endswith("/"):
jobs_dir += "/"
self.JobsDir = jobs_dir
# store values
self.CommandList = commands_list
self.JobsPrefix = jobs_prefix
self.ProcsPerNode = ppn
self.NumNodes = nnodes
self.NumCpus = ncpus
self.NumJobs = num_jobs
self.WallTime = wall_time
self.QueueName = queue_name
self.JobRange = job_range
self.UseMPIRun = use_mpirun
self.JobFilenames = []
self.QstatPrefix = qstat_prefix
self.DelaySecs = delay_secs
self.Verbose = verbose
self.KeepOutput = keep_output
self.DiscardOutput = discard_output
self.Memory = memory
def makeJobs(self):
"""
Write out jobs
"""
out = []
out.append("Trying to make %d jobs." % self.NumJobs)
out.append("Writing jobs to: %s" % self.JobsDir)
out.append("Delay between jobs: %.2f" % self.DelaySecs)
ct = 0
job_lists = {}
# for each command, add to growing list of jobs
for cmd in map(strip, self.CommandList):
cur_job = ct % self.NumJobs
# check if new job
if not cur_job in job_lists:
job_lists[cur_job] = []
# if UseMPIRun, prefix command with mpirun command
mpirun = "mpirun -np %d " % self.NumCpus
if not self.UseMPIRun:
mpirun = ""
# append command to current job
job_lists[cur_job].append(mpirun + cmd)
ct += 1
# write out pbs files, cache job filenames
for jkey, jlist in job_lists.items():
job_out = "%s%s%d.pbs" %(self.JobsDir,
self.JobsPrefix,
jkey)
self.JobFilenames.append(job_out)
o_file = open(job_out, "w+")
ppn_text = ""
if self.ProcsPerNode > 1:
ppn_text = ":ppn=%d" % self.ProcsPerNode
discard_out = ""
if self.DiscardOutput:
picks = list('abcdefghigklmnopqrstuvwxyz0123456789')
base = ''.join([choice(picks) for i in range(9)]) + ".discard"
rerr = "e" + base
rout = "o" + base
discard_out = "#PBS -o /tmp/%s\n#PBS -e /tmp/%s" % (rout, rerr)
o_file.write(QSUB_TEXT % (self.WallTime,
self.NumCpus,
self.NumNodes,
ppn_text,
self.Memory,
self.QueueName,
"%s%d" % (self.JobsPrefix, jkey),
self.KeepOutput,
discard_out,
'\n'.join(jlist)))
o_file.close()
return '\n'.join(out)
def submitJobs(self):
"""
Submit all jobs.
"""
if not self.JobFilenames:
raise ValueError, "No job filenames available. Have you run makeJobs yet?"
out = []
job_ids = []
out.append("Submitting %d jobs to %s queue!" % (len(self.JobFilenames),
self.QueueName))
start_ix = 0
stop_ix = len(self.JobFilenames)
if self.JobRange:
start_ix, stop_ix = self.JobRange
for file_pt in zip(range(len(self.JobFilenames)), self.JobFilenames):
ct, p_file = file_pt
if ct < start_ix or ct >= stop_ix:
if self.Verbose:
print "skipping: %d" % ct
continue
if self.Verbose:
print "Submitting: ", p_file
#raise ValueError, "Submitting: %s" % (QSUB_CMD % p_file)
out_id = getoutput(QSUB_CMD % p_file)
out.append(out_id)
job_ids.append(out_id)
sleep(self.DelaySecs)
out.append("done.\nType 'qstat' or 'qstat -a' to check status of your jobs.")
return '\n'.join(out), job_ids
from cogent.util.misc import parse_command_line_parameters
from optparse import make_option
script_info={}
script_info['brief_description']="""Submit jobs to a PBS/Torque cluster"""
script_info['script_description']="""This script consumes a list of command lines and partitions those commands into a specified number of jobs. Within each job, the commands are run serially."""
script_info['script_usage']=[]
script_info['required_options'] = [\
make_option('-i','--input',help="file containing the input commands",
type='str'),
make_option('-p','--prefix',help="submission prefix", type='str'),
make_option('-t','--runtype',help="what to do: make or make_submit "\
"[default:%default]",type='choice', choices=['make','make_submit'],
default='make')]
script_info['optional_options'] = [\
make_option('-j','--jobs',help="make N jobs", type='int', default=20),
make_option('-m','--mem',help="memory per job",type='str',default='2gb'),
make_option('-q','--queue',help="submit queue",type='str',
default='memroute'),
make_option('-w','--walltime',help='walltime per job',type='str',
default='120:00:00'),
make_option('-d','--delay',help='job submission delay',type='float',
default=0.1)]
script_info['version'] = __version__
if __name__ == "__main__":
option_parser, opts, args = parse_command_line_parameters(**script_info)
cj = ClusterJobs(open(opts.input), opts.prefix, num_jobs=opts.jobs,
queue_name=opts.queue,delay_secs=opts.delay,
wall_time=opts.walltime,memory=opts.mem)
if opts.runtype == 'make' or opts.runtype == 'make_submit':
cj.makeJobs()
if opts.runtype == 'make_submit':
cj.submitJobs()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment