Skip to content

Instantly share code, notes, and snippets.

@etal
Created January 16, 2012 21:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save etal/1623117 to your computer and use it in GitHub Desktop.
Save etal/1623117 to your computer and use it in GitHub Desktop.
tasks.py & a demo script (use your own FASTA files)
#!/usr/bin/env python
"""Demo for tasks.py."""
import os
import sys
from glob import glob
from Bio import SeqIO
from tasks import Task, ext, sh
TOPDIR = sys.argv[1]
def align_fasta(task):
"""Align the FASTA file specified in task.depends[0]."""
sh("mafft --localpair --maxiterate 1000 %s > %s"
% (task.depends[0], task.target))
def align_profiles(task):
"""Align the alignments specified in task.depends."""
# Fool MAFFT with an empty input sequence set
sh("echo > mt.seq")
# Profile-profile alignment...
outseq = ext(task.target, 'seq')
sh("mafft --maxiterate 1000 "
+ ' '.join(['--seed %s' % d for d in task.depends])
+ ' mt.seq > '
+ outseq)
# Contrived use of a Python library
SeqIO.convert(outseq, 'fasta',
task.target, 'clustal')
# Individually align each FASTA file in the given directory
subfamily_fastas = glob(os.path.join(TOPDIR, '*.fasta'))
subfamily_results = []
for subfa in sorted(subfamily_fastas):
subresult = Task(ext(subfa, 'seq'),
action=align_fasta,
depends=subfa)
subfamily_results.append(subresult)
# Combine the sub-alignments
result = Task(TOPDIR.rstrip('/') + '.aln',
action=align_profiles,
depends=subfamily_results,
cleans=ext(subfamily_results, 'seq'))
result.build()
result.clean()
"""Mini-build system for managing dependencies in a computational pipeline.
See: http://etalog.blogspot.com/2012/01/building-analysis-how-to-avoid.html
"""
import logging
import os
import subprocess
from os.path import exists, isfile
def ext(path, extension):
"""Replace the extension on a path (or iterable of paths)."""
if isinstance(path, Task):
return ext(path.target, extension)
if isinstance(path, basestring):
return path.rsplit('.', 1)[0] + '.' + extension
return [ext(p, extension) for p in path]
def noext(path):
"""Replace the extension on a path (or iterable of paths)."""
if isinstance(path, Task):
return noext(path.target)
if isinstance(path, basestring):
return path.rsplit('.', 1)[0]
return [noext(p) for p in path]
def sh(cmd):
try:
subprocess.check_call(cmd, shell=True)
except (OSError, subprocess.CalledProcessError), exc:
logging.warning("*** Failed command: %s", str(cmd))
raise exc
class Task(object):
"""Dependency specification, in terms of output, inputs, intermediates & process.
:Parameters:
`target`: string
Filename or directory to build.
`action`: callable
Function to call to build the target.
Implicitly uses self.depends as inputs and writes to self.target,
though this isn't checked.
`depends`: iterable of Task instances
Collection of dependency Tasks or strings required to be up-to-date
to build self.target. Converted to a list automatically, if needed.
`cleans`: iterable of strings
File names to be removed later. Converted to a list automatically.
"""
@staticmethod
def _enlist(x):
if not x:
return []
if isinstance(x, Task) or isinstance(x, basestring):
return [x]
return list(x)
def __init__(self, target, action=lambda x: None, depends=None, cleans=None,
kwargs=None):
# Filename or directory to build
assert callable(action)
self.target = target
self.action = action
self.depends = self._enlist(depends)
self.kwargs = kwargs or {}
self.cleans = self._enlist(cleans)
def build(self):
for dep in self.depends:
if isinstance(dep, Task) and not dep.is_up_to_date():
dep.build()
if exists(self.target) and all(self.is_newer_than(dep)
for dep in self.depends):
logging.info("%s: Up to date.", self.target)
else:
logging.info("%s: Building...", self.target)
try:
self.action(self, **self.kwargs)
except:
logging.warning("*** Failed action @ %s", self.target)
raise
def is_newer_than(self, dep):
if not exists(self.target):
return False
return (os.stat(self.target).st_mtime
>= os.stat(str(dep)).st_mtime)
def is_up_to_date(self):
"""True if the target does not need to be rebuilt."""
return exists(self.target) and all(
(dep.is_up_to_date()
if isinstance(dep, Task)
else exists(dep))
and self.is_newer_than(dep)
for dep in self.depends)
def clean(self):
"""Recursively delete auxilliary files."""
for fname in self.cleans:
if isfile(fname):
logging.info('rm %s', fname)
os.remove(fname)
for dep in self.depends:
if isinstance(dep, Task):
dep.clean()
def __cmp__(self, other):
"""Enable sorting alphabetically by target name."""
return cmp(self.target, str(other))
def __str__(self):
return self.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment