Skip to content

Instantly share code, notes, and snippets.

@JudoWill JudoWill/pavement
Created Oct 1, 2010

Embed
What would you like to do?
An example of using both paver and ruffus to do scientific workflows.
from paver.easy import *
import os.path, os
import csv
import ruffus
from collections import defaultdict
from types import GeneratorType, DictType
from itertools import ifilter
options(
DATA_DIR = 'Data',
PROCESSED = 'Data/Processed',
STITCH_CUT = 900,
)
@task
def touch_data():
for path, _, files in os.walk(options.DATA_DIR):
for f in files:
f = f.replace(' ', '\ ')
sh('touch %s' % os.path.join(path, f))
@task
def run():
ruffus.pipeline_run([top_function])
@ruffus.follows('process_data', 'process_stitch')
def top_function():
pass
@ruffus.merge(os.path.join(options.PROCESSED, '*.csv'),
os.path.join(options.PROCESSED, 'results.out'))
@ruffus.follows('process_bras', 'process_chassey', 'process_fu',
'process_konig', 'process_kumar', 'process_shapira',
'process_tai')
def process_data(in_files, out_file):
fields = ('Symbol', 'Viral-Protein',
'Disease', 'Method', 'Ref')
with open(out_file, 'w') as out_handle:
writer = csv.DictWriter(out_handle, fields,
delimiter = '\t')
for f in in_files:
with open(f) as in_handle:
writer.writerows(csv.DictReader(in_handle, delimiter = '\t'))
@ruffus.files(os.path.join(options.DATA_DIR, 'Bras-2008', 'Table-S2.csv'),
os.path.join(options.PROCESSED, 'Bras-2008.csv'))
def process_bras(in_file, out_file):
def conv_fun(row):
return {
'Symbol':row["Symbol"],
'Viral-Protein':'Unknown',
'Disease':'HIV',
'Method':'RNAi',
'Ref':'Bras-2008'
}
process_file(in_file, out_file, conv_fun)
@ruffus.files(os.path.join(options.DATA_DIR, 'Chassey-2008', 'msb200866-s2.csv'),
os.path.join(options.PROCESSED, 'Chassey-2008.csv'))
def process_chassey(in_file, out_file):
def conv_fun(row):
if len(row["Text Mining"].strip()) > 0:
yield {
'Symbol':row["Gene Symbol"],
'Viral-Protein':row["HCV-Protein"],
'Disease':'HCV',
'Method':'Literature',
'Ref':'Chassey-2008'
}
if len(row["Y2H"].strip()) > 0:
yield {
'Symbol':row["Gene Symbol"],
'Viral-Protein':row["HCV-Protein"],
'Disease':'HCV',
'Method':'Y2H',
'Ref':'Chassey-2008'
}
process_file(in_file, out_file, conv_fun)
@ruffus.files(os.path.join(options.DATA_DIR, 'Fu-2009', 'hiv_interactions'),
os.path.join(options.PROCESSED, 'Fu-2009.csv'))
def process_fu(in_file, out_file):
def conv_fun(row, symbol_id):
return {
'Symbol':symbol_id[row["Gene ID 2"]],
'Viral-Protein':row["product name 1"],
'Disease':'HIV',
'Method':'Literature',
'Ref':'Fu-2009'
}
symbol_id = get_geneID2Symbol()
process_file(in_file, out_file, conv_fun, extra = (symbol_id, ))
@ruffus.files(os.path.join(options.DATA_DIR, 'Konig-2010', 'nature08699-s8.csv'),
os.path.join(options.PROCESSED, 'Konig-2010.csv'))
def process_konig(in_file, out_file):
def conv_fun(row):
if len(row["Influenza (this study)"]) > 0:
return {
'Symbol':row["Symbol"],
'Viral-Protein':'Unknown',
'Disease':'Influenza',
'Method':'RNAi',
'Ref':'Konig-2010'
}
process_file(in_file, out_file, conv_fun)
@ruffus.files(os.path.join(options.DATA_DIR, 'Kumar-2010', 'mmc2.csv'),
os.path.join(options.PROCESSED, 'kumar-2010.csv'))
def process_kumar(in_file, out_file):
def conv_fun(row):
if float(row["p-value"]) < 0.05:
return {
'Symbol':row["GeneSymbol"],
'Viral-Protein':'Unknown',
'Disease':'Influenza',
'Method':'RNAi',
'Ref':'Kumar-2010'
}
process_file(in_file, out_file, conv_fun)
@ruffus.files(os.path.join(options.DATA_DIR, 'Shapira-2009', 'mmc2.csv'),
os.path.join(options.PROCESSED, 'Shapira-2009.csv'))
def process_shapira(in_file, out_file):
def conv_fun(row, symbol_id, pdict):
gene = symbol_id[row['entrez gene ID']]
yield {
'Symbol':gene,
'Ref':'Shapira-2009',
'Disease':'Influenza',
'Method':'Y2H',
'Viral-Protein':'Unknown'
}
for field, info in pdict.items():
if len(row[field].strip()) > 0:
yield dict(Symbol = gene, **info)
pdict = {
'HCV Li et al. (25 genes)': {
'Ref':'Li-2009',
'Disease':'HCV',
'Method':'RNAi'
},
"WNV Krishnan et al. (14 genes)":{
'Ref':'Krishnan-2008',
'Disease':'West-Nile',
'Method':'RNAi'
},
"HIV Zhou et al. (5 genes)":{
'Ref':'Zhou-2008',
'Disease':'HIV',
'Method':'RNAi'
}
}
symbol_id = get_geneID2Symbol()
process_file(in_file, out_file, conv_fun, extra = (symbol_id, pdict))
@ruffus.files(os.path.join(options.DATA_DIR, 'Tai-2009', 'mmc3.csv'),
os.path.join(options.PROCESSED, 'Tai-2009.csv'))
def process_tai(in_file, out_file):
def conv_fun(row):
return {
'Symbol':row['Symbol'],
'Ref':'Tai-2009',
'Disease':'HCV',
'Method':'RNAi',
'Viral-Protein':'Unknown'
}
process_file(in_file, out_file, conv_fun)
@ruffus.files([os.path.join(options.DATA_DIR, 'stitch', 'protein.aliases.v8.2.txt'),
os.path.join(options.DATA_DIR, 'stitch', 'protein_chemical.human.links.v2.0.tsv'),
os.path.join(options.DATA_DIR, 'stitch', 'chemical.aliases.v2.0.tsv'),],
os.path.join(options.DATA_DIR, 'stitch', 'processed.csv'))
def process_stitch(in_files, out_file):
protein_alias, protein_chemical, chemical_alias = in_files
symbol_id = get_geneID2Symbol(source = 'symbol')
print 'getting protein-conv'
protein_conv = {}
with open(protein_alias) as handle:
fields = ('species', 'protein-id', 'alias', 'source')
handle.next()
for row in csv.DictReader(handle, fieldnames = fields, delimiter = '\t'):
if row['species'] == '9606' and row['source'] == 'Ensembl_EntrezGene':
protein_conv[row['protein-id']] = symbol_id[row['alias']]
print 'getting chem-conv'
chem_conv = {}
with open(chemical_alias) as handle:
for row in csv.DictReader(handle, delimiter = '\t'):
chem_conv[row['chemical']] = row['alias']
fields = ('Symbol', 'Chemical')
in_fields = ('chemical', 'protein', 'combined_score')
print 'processing interactions'
with open(out_file, 'w') as out_handle:
writer = csv.DictWriter(out_handle, fields, delimiter = '\t')
with open(protein_chemical) as handle:
for c, row in enumerate(csv.DictReader(handle,
fieldnames = in_fields,
delimiter = '\t')):
if c % 10000 == 0:
print c
if row['protein'].startswith('9606.'):
prot = row['protein'].split('.', 1)[1]
if int(row['combined_score']) > options.STITCH_CUT:
gene = protein_conv.get(prot, prot)
chem = chem_conv.get(row['chemical'], row['chemical'])
writer.writerow({
'Symbol':gene,
'Chemical':chem
})
def get_geneID2Symbol(source = 'geneid'):
fname = os.path.join(options.DATA_DIR, 'Homo_sapiens.gene_info')
fields = ('taxid', 'geneid', 'symbol', 'locustag', 'synonyms',
'dbXrefs', 'chromosome', 'maplocation', 'description',
'type', 'symauth', 'fullauth', 'status', 'other', 'date')
out = defaultdict(lambda : None)
with open(fname) as handle:
for row in csv.DictReader(handle, fieldnames = fields, delimiter = '\t'):
if source == 'symbol':
for val in row[source].split('|'):
out[val] = row['symbol']
else:
out[row[source]] = row['symbol']
return out
def process_file(in_file, out_file, func, extra = tuple()):
fields = ('Symbol', 'Viral-Protein',
'Disease', 'Method', 'Ref')
with open(in_file) as in_handle:
line_gen = csv.DictReader(in_handle, delimiter = '\t')
with open(out_file, 'w') as out_handle:
out_handle.write('\t'.join(fields)+'\n')
writer = csv.DictWriter(out_handle,
fields,
delimiter = '\t')
for row in line_gen:
val = func(row, *extra)
if type(val) == GeneratorType:
writer.writerows(val)
elif type(val) == DictType:
writer.writerow(val)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.