Last active
August 29, 2015 14:19
-
-
Save Autoplectic/9a4997dc67efed08366c to your computer and use it in GitHub Desktop.
A python script to farm out tasks to a SLURM cluster.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
You might need to run the following before running this script: | |
pip install --upgrade --user click clusterlib decorator logbook minibar pathlib sarge | |
Usage: | |
dispatcher.py ./script.py dispatch arg1 arg2 arg3 | |
""" | |
from __future__ import division, print_function | |
from imp import load_source | |
from sys import argv, executable as python, stdout | |
import click | |
from clusterlib.scheduler import queued_or_running_jobs, submit | |
from clusterlib.storage import sqlite3_dumps, sqlite3_loads | |
from decorator import decorator | |
from logbook import Logger, StreamHandler | |
from minibar import bar | |
from pathlib import Path | |
from sarge import get_both | |
VALID_PARTITIONS = ['parallel', 'bigmem', 'serial'] | |
BAR_TEMPLATE = "Dispatching tasks: {i}/{total} {bar:fill}" | |
LOG = Logger('Dispatcher') | |
LOG_STREAM = StreamHandler(stdout) | |
@decorator | |
def autotype(func, *args, **kwargs): | |
""" | |
Coerse string arguments in to ints or floats. | |
""" | |
def parse(arg): | |
try: | |
arg = float(arg) | |
if arg.is_integer(): | |
return int(arg) | |
else: | |
return arg | |
except ValueError: | |
return arg | |
args = [ parse(arg) for arg in args ] | |
kwargs = {key: parse(arg) for key, arg in kwargs.items()} | |
return func(*args, **kwargs) | |
@click.group() | |
@click.pass_context | |
@click.argument('script') | |
def cli(ctx, script): | |
""" | |
Command group. Constructs things needed bt all commands: imports the script and builds the nosql | |
database path. | |
""" | |
ctx.obj['script'] = load_source('script', script) | |
ctx.obj['script_name'] = script | |
ctx.obj['NOSQL_PATH'] = str((Path('.') / "{}.sqlite3".format(ctx.obj['script'].NAME)).absolute()) | |
@cli.command() | |
@click.pass_context | |
@click.argument('args', nargs=-1) | |
@click.option('--mem') | |
@click.option('--partition', default="parallel", type=click.Choice(VALID_PARTITIONS)) | |
def dispatch(ctx, args, mem, partition): | |
""" | |
The primary function. It iterates over all parameters, and schedules all sets of parameters that | |
are not running, queued, or finished (e.g. have not yet been run, or were killed). | |
""" | |
# write out a file with the exact command that called this function. | |
with open('run_command.txt', 'w') as cmd_file: | |
cmd_file.write(' '.join(argv)) | |
# collect queued, running, and finished jobs. | |
scheduled_jobs = set(queued_or_running_jobs()) | |
done_jobs = sqlite3_loads(ctx.obj['NOSQL_PATH']) | |
jobs = [] | |
# iterate over all parameters. | |
for param in autotype(ctx.obj['script'].params)(*args): | |
try: | |
param[0] | |
except: | |
param = (param,) | |
param_string = ' '.join(str(_) for _ in param) | |
job_cmd = "{} {} {} main {}".format(python, __file__, ctx.obj['script_name'], param_string) | |
job_name = "{}_{}".format(ctx.obj['script'].NAME, param_string.replace(' ', '_')) | |
if job_name not in scheduled_jobs and job_name not in done_jobs: | |
jobs.append((job_cmd, job_name)) | |
# dispatch all jobs that need dispatching. | |
for job_cmd, job_name in bar(jobs, template=BAR_TEMPLATE): | |
script = submit(job_command=job_cmd, | |
job_name=job_name, | |
time="00:00:00", | |
memory=(mem if mem else (2000 if partition == "serial" else 4000)), | |
backend='slurm') | |
script += " --partition={}".format(partition) | |
get_both(script) | |
@cli.command() | |
@click.pass_context | |
@click.argument('args', nargs=-1) | |
def main(ctx, args): | |
""" | |
This function is run on every compute node. | |
""" | |
job_name = "{}_{}".format(ctx.obj['script'].NAME, '_'.join(args)) | |
LOG.info("Arguments: {}".format(args)) | |
autotype(ctx.obj['script'].compute)(*args) | |
LOG.info("Finished.") | |
sqlite3_dumps({job_name: "JOB DONE"}, ctx.obj['NOSQL_PATH']) | |
if __name__ == '__main__': | |
with LOG_STREAM.applicationbound(): | |
cli(obj={}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment