Skip to content

Instantly share code, notes, and snippets.

Last active August 29, 2015 14:19
Show Gist options
  • Save Autoplectic/9a4997dc67efed08366c to your computer and use it in GitHub Desktop.
Save Autoplectic/9a4997dc67efed08366c to your computer and use it in GitHub Desktop.
A python script to farm out tasks to a SLURM cluster.
#!/usr/bin/env python
You might need to run the following before running this script:
pip install --upgrade --user click clusterlib decorator logbook minibar pathlib sarge
Usage: ./ dispatch arg1 arg2 arg3
from __future__ import division, print_function
from imp import load_source
from sys import argv, executable as python, stdout
import click
from clusterlib.scheduler import queued_or_running_jobs, submit
from import sqlite3_dumps, sqlite3_loads
from decorator import decorator
from logbook import Logger, StreamHandler
from minibar import bar
from pathlib import Path
from sarge import get_both
VALID_PARTITIONS = ['parallel', 'bigmem', 'serial']
BAR_TEMPLATE = "Dispatching tasks: {i}/{total} {bar:fill}"
LOG = Logger('Dispatcher')
LOG_STREAM = StreamHandler(stdout)
def autotype(func, *args, **kwargs):
Coerse string arguments in to ints or floats.
def parse(arg):
arg = float(arg)
if arg.is_integer():
return int(arg)
return arg
except ValueError:
return arg
args = [ parse(arg) for arg in args ]
kwargs = {key: parse(arg) for key, arg in kwargs.items()}
return func(*args, **kwargs)
def cli(ctx, script):
Command group. Constructs things needed bt all commands: imports the script and builds the nosql
database path.
ctx.obj['script'] = load_source('script', script)
ctx.obj['script_name'] = script
ctx.obj['NOSQL_PATH'] = str((Path('.') / "{}.sqlite3".format(ctx.obj['script'].NAME)).absolute())
@click.argument('args', nargs=-1)
@click.option('--partition', default="parallel", type=click.Choice(VALID_PARTITIONS))
def dispatch(ctx, args, mem, partition):
The primary function. It iterates over all parameters, and schedules all sets of parameters that
are not running, queued, or finished (e.g. have not yet been run, or were killed).
# write out a file with the exact command that called this function.
with open('run_command.txt', 'w') as cmd_file:
cmd_file.write(' '.join(argv))
# collect queued, running, and finished jobs.
scheduled_jobs = set(queued_or_running_jobs())
done_jobs = sqlite3_loads(ctx.obj['NOSQL_PATH'])
jobs = []
# iterate over all parameters.
for param in autotype(ctx.obj['script'].params)(*args):
param = (param,)
param_string = ' '.join(str(_) for _ in param)
job_cmd = "{} {} {} main {}".format(python, __file__, ctx.obj['script_name'], param_string)
job_name = "{}_{}".format(ctx.obj['script'].NAME, param_string.replace(' ', '_'))
if job_name not in scheduled_jobs and job_name not in done_jobs:
jobs.append((job_cmd, job_name))
# dispatch all jobs that need dispatching.
for job_cmd, job_name in bar(jobs, template=BAR_TEMPLATE):
script = submit(job_command=job_cmd,
memory=(mem if mem else (2000 if partition == "serial" else 4000)),
script += " --partition={}".format(partition)
@click.argument('args', nargs=-1)
def main(ctx, args):
This function is run on every compute node.
job_name = "{}_{}".format(ctx.obj['script'].NAME, '_'.join(args))"Arguments: {}".format(args))
sqlite3_dumps({job_name: "JOB DONE"}, ctx.obj['NOSQL_PATH'])
if __name__ == '__main__':
with LOG_STREAM.applicationbound():
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment