Skip to content

Instantly share code, notes, and snippets.

@Autoplectic
Last active August 29, 2015 14:19
Show Gist options
  • Save Autoplectic/9a4997dc67efed08366c to your computer and use it in GitHub Desktop.
Save Autoplectic/9a4997dc67efed08366c to your computer and use it in GitHub Desktop.
A python script to farm out tasks to a SLURM cluster.
#!/usr/bin/env python
"""
You might need to run the following before running this script:
pip install --upgrade --user click clusterlib decorator logbook minibar pathlib sarge
Usage:
dispatcher.py ./script.py dispatch arg1 arg2 arg3
"""
from __future__ import division, print_function
from imp import load_source
from sys import argv, executable as python, stdout
import click
from clusterlib.scheduler import queued_or_running_jobs, submit
from clusterlib.storage import sqlite3_dumps, sqlite3_loads
from decorator import decorator
from logbook import Logger, StreamHandler
from minibar import bar
from pathlib import Path
from sarge import get_both
VALID_PARTITIONS = ['parallel', 'bigmem', 'serial']
BAR_TEMPLATE = "Dispatching tasks: {i}/{total} {bar:fill}"
LOG = Logger('Dispatcher')
LOG_STREAM = StreamHandler(stdout)
@decorator
def autotype(func, *args, **kwargs):
"""
Coerse string arguments in to ints or floats.
"""
def parse(arg):
try:
arg = float(arg)
if arg.is_integer():
return int(arg)
else:
return arg
except ValueError:
return arg
args = [ parse(arg) for arg in args ]
kwargs = {key: parse(arg) for key, arg in kwargs.items()}
return func(*args, **kwargs)
@click.group()
@click.pass_context
@click.argument('script')
def cli(ctx, script):
"""
Command group. Constructs things needed bt all commands: imports the script and builds the nosql
database path.
"""
ctx.obj['script'] = load_source('script', script)
ctx.obj['script_name'] = script
ctx.obj['NOSQL_PATH'] = str((Path('.') / "{}.sqlite3".format(ctx.obj['script'].NAME)).absolute())
@cli.command()
@click.pass_context
@click.argument('args', nargs=-1)
@click.option('--mem')
@click.option('--partition', default="parallel", type=click.Choice(VALID_PARTITIONS))
def dispatch(ctx, args, mem, partition):
"""
The primary function. It iterates over all parameters, and schedules all sets of parameters that
are not running, queued, or finished (e.g. have not yet been run, or were killed).
"""
# write out a file with the exact command that called this function.
with open('run_command.txt', 'w') as cmd_file:
cmd_file.write(' '.join(argv))
# collect queued, running, and finished jobs.
scheduled_jobs = set(queued_or_running_jobs())
done_jobs = sqlite3_loads(ctx.obj['NOSQL_PATH'])
jobs = []
# iterate over all parameters.
for param in autotype(ctx.obj['script'].params)(*args):
try:
param[0]
except:
param = (param,)
param_string = ' '.join(str(_) for _ in param)
job_cmd = "{} {} {} main {}".format(python, __file__, ctx.obj['script_name'], param_string)
job_name = "{}_{}".format(ctx.obj['script'].NAME, param_string.replace(' ', '_'))
if job_name not in scheduled_jobs and job_name not in done_jobs:
jobs.append((job_cmd, job_name))
# dispatch all jobs that need dispatching.
for job_cmd, job_name in bar(jobs, template=BAR_TEMPLATE):
script = submit(job_command=job_cmd,
job_name=job_name,
time="00:00:00",
memory=(mem if mem else (2000 if partition == "serial" else 4000)),
backend='slurm')
script += " --partition={}".format(partition)
get_both(script)
@cli.command()
@click.pass_context
@click.argument('args', nargs=-1)
def main(ctx, args):
"""
This function is run on every compute node.
"""
job_name = "{}_{}".format(ctx.obj['script'].NAME, '_'.join(args))
LOG.info("Arguments: {}".format(args))
autotype(ctx.obj['script'].compute)(*args)
LOG.info("Finished.")
sqlite3_dumps({job_name: "JOB DONE"}, ctx.obj['NOSQL_PATH'])
if __name__ == '__main__':
with LOG_STREAM.applicationbound():
cli(obj={})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment