Created
January 16, 2012 21:25
-
-
Save etal/1623117 to your computer and use it in GitHub Desktop.
tasks.py & a demo script (use your own FASTA files)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Demo for tasks.py.""" | |
import os | |
import sys | |
from glob import glob | |
from Bio import SeqIO | |
from tasks import Task, ext, sh | |
TOPDIR = sys.argv[1] | |
def align_fasta(task): | |
"""Align the FASTA file specified in task.depends[0].""" | |
sh("mafft --localpair --maxiterate 1000 %s > %s" | |
% (task.depends[0], task.target)) | |
def align_profiles(task): | |
"""Align the alignments specified in task.depends.""" | |
# Fool MAFFT with an empty input sequence set | |
sh("echo > mt.seq") | |
# Profile-profile alignment... | |
outseq = ext(task.target, 'seq') | |
sh("mafft --maxiterate 1000 " | |
+ ' '.join(['--seed %s' % d for d in task.depends]) | |
+ ' mt.seq > ' | |
+ outseq) | |
# Contrived use of a Python library | |
SeqIO.convert(outseq, 'fasta', | |
task.target, 'clustal') | |
# Individually align each FASTA file in the given directory | |
subfamily_fastas = glob(os.path.join(TOPDIR, '*.fasta')) | |
subfamily_results = [] | |
for subfa in sorted(subfamily_fastas): | |
subresult = Task(ext(subfa, 'seq'), | |
action=align_fasta, | |
depends=subfa) | |
subfamily_results.append(subresult) | |
# Combine the sub-alignments | |
result = Task(TOPDIR.rstrip('/') + '.aln', | |
action=align_profiles, | |
depends=subfamily_results, | |
cleans=ext(subfamily_results, 'seq')) | |
result.build() | |
result.clean() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Mini-build system for managing dependencies in a computational pipeline. | |
See: http://etalog.blogspot.com/2012/01/building-analysis-how-to-avoid.html | |
""" | |
import logging | |
import os | |
import subprocess | |
from os.path import exists, isfile | |
def ext(path, extension): | |
"""Replace the extension on a path (or iterable of paths).""" | |
if isinstance(path, Task): | |
return ext(path.target, extension) | |
if isinstance(path, basestring): | |
return path.rsplit('.', 1)[0] + '.' + extension | |
return [ext(p, extension) for p in path] | |
def noext(path): | |
"""Replace the extension on a path (or iterable of paths).""" | |
if isinstance(path, Task): | |
return noext(path.target) | |
if isinstance(path, basestring): | |
return path.rsplit('.', 1)[0] | |
return [noext(p) for p in path] | |
def sh(cmd): | |
try: | |
subprocess.check_call(cmd, shell=True) | |
except (OSError, subprocess.CalledProcessError), exc: | |
logging.warning("*** Failed command: %s", str(cmd)) | |
raise exc | |
class Task(object): | |
"""Dependency specification, in terms of output, inputs, intermediates & process. | |
:Parameters: | |
`target`: string | |
Filename or directory to build. | |
`action`: callable | |
Function to call to build the target. | |
Implicitly uses self.depends as inputs and writes to self.target, | |
though this isn't checked. | |
`depends`: iterable of Task instances | |
Collection of dependency Tasks or strings required to be up-to-date | |
to build self.target. Converted to a list automatically, if needed. | |
`cleans`: iterable of strings | |
File names to be removed later. Converted to a list automatically. | |
""" | |
@staticmethod | |
def _enlist(x): | |
if not x: | |
return [] | |
if isinstance(x, Task) or isinstance(x, basestring): | |
return [x] | |
return list(x) | |
def __init__(self, target, action=lambda x: None, depends=None, cleans=None, | |
kwargs=None): | |
# Filename or directory to build | |
assert callable(action) | |
self.target = target | |
self.action = action | |
self.depends = self._enlist(depends) | |
self.kwargs = kwargs or {} | |
self.cleans = self._enlist(cleans) | |
def build(self): | |
for dep in self.depends: | |
if isinstance(dep, Task) and not dep.is_up_to_date(): | |
dep.build() | |
if exists(self.target) and all(self.is_newer_than(dep) | |
for dep in self.depends): | |
logging.info("%s: Up to date.", self.target) | |
else: | |
logging.info("%s: Building...", self.target) | |
try: | |
self.action(self, **self.kwargs) | |
except: | |
logging.warning("*** Failed action @ %s", self.target) | |
raise | |
def is_newer_than(self, dep): | |
if not exists(self.target): | |
return False | |
return (os.stat(self.target).st_mtime | |
>= os.stat(str(dep)).st_mtime) | |
def is_up_to_date(self): | |
"""True if the target does not need to be rebuilt.""" | |
return exists(self.target) and all( | |
(dep.is_up_to_date() | |
if isinstance(dep, Task) | |
else exists(dep)) | |
and self.is_newer_than(dep) | |
for dep in self.depends) | |
def clean(self): | |
"""Recursively delete auxilliary files.""" | |
for fname in self.cleans: | |
if isfile(fname): | |
logging.info('rm %s', fname) | |
os.remove(fname) | |
for dep in self.depends: | |
if isinstance(dep, Task): | |
dep.clean() | |
def __cmp__(self, other): | |
"""Enable sorting alphabetically by target name.""" | |
return cmp(self.target, str(other)) | |
def __str__(self): | |
return self.target |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment