Skip to content

Instantly share code, notes, and snippets.

@willfurnass
Last active January 11, 2017 13:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save willfurnass/0c24fd13f2ee2535fbae9a0340cd9d3a to your computer and use it in GitHub Desktop.
Save willfurnass/0c24fd13f2ee2535fbae9a0340cd9d3a to your computer and use it in GitHub Desktop.
Ruffus + DRMAA test
#!/usr/bin/env python
# A test of whether Ruffus can be used to submit jobs to a cluster using the
# DRMAA API, sharing the DRMAA session between threads to allow multiple jobs to
# be submitted at once without burdening the head node with multiple DRMAA
# sessions.
# Can run this from a conda environnment created using
# conda create -n drmaatest -c bioconda python=2.7 ruffus drmaa
from __future__ import print_function
from ruffus import transform, suffix, pipeline_run, cmdline
from ruffus.drmaa_wrapper import run_job
import drmaa
import sys
session = drmaa.Session()
session.initialize()
starting_files = ["a.stg1", "b.stg1", "c.stg1"]
# where each file contains multiple lines with one word per line
# logger which can be passed to multiprocessing ruffus tasks
logger, logger_mutex = cmdline.setup_logging(__name__, 'test.log', True)
@transform(starting_files, suffix(".stg1"), ".stg2")
def stg1_func(input_file, output_file):
"""Extract just the first three lines of the input file."""
stmt = "head -n 3 {} > {}".format(input_file, output_file)
run_job(stmt,
job_name="stage1",
drmaa_session=session,
job_other_options="-V -l rmem=50M",
job_script_directory=".",
logger=logger,
retain_job_scripts=True)
@transform(stg1_func, suffix(".stg2"), ".stg3")
def stg2_func(input_file, output_file): #
"""Join the lines of the input file."""
stmt = "paste -sd' ' {} > {}".format(input_file, output_file)
run_job(stmt,
job_name="stage2",
drmaa_session=session,
job_other_options="-V -l rmem=50M",
job_script_directory=".",
logger=logger,
retain_job_scripts=True)
if __name__ == '__main__':
pipeline_run(target_tasks=[stg2_func], multithread=3)
session.exit()
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment