Last active
August 29, 2015 13:59
-
-
Save standage/10765999 to your computer and use it in GitHub Desktop.
Given TransDecoder output from two samples, identify instances where CDS starts and stops match but translation is different.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import getopt, os, re, sys | |
class Seq: | |
def __init__(self, defline, seq): | |
self.defline = defline | |
self.seq = seq | |
match = re.match("^>(\S+)", defline) | |
self.id = match.group(1) | |
def print_usage(outstream): | |
usage = ("\nUsage: tdc-diff s1.cds.gff3 s1.pep.fa s2.cds.gff3 s2.pep.fa\n" | |
" Options:\n" | |
" -h|--help: print this help message and exit\n" | |
" -o|--out: FILE file to which output will be printed;\n" | |
" default is terminal (stdout)\n") | |
print >> outstream, usage | |
def parse_fasta(fp): | |
""" | |
Stolen shamelessly from http://stackoverflow.com/a/7655072/459780. | |
""" | |
name, seq = None, [] | |
for line in fp: | |
line = line.rstrip() | |
if line.startswith(">"): | |
if name: yield (name, ''.join(seq)) | |
name, seq = line, [] | |
else: | |
seq.append(line) | |
if name: yield (name, ''.join(seq)) | |
def parse_options(argv): | |
params = { | |
'outstream': sys.stdout, | |
} | |
optstr = "ho:" | |
longopts = ["help", "out"] | |
(options, args) = getopt.getopt(argv[1:], optstr, longopts) | |
for key, value in options: | |
if key in ('-h', '--help'): | |
print_usage(sys.stdout) | |
sys.exit(0) | |
elif key in ('-o', '--out'): | |
params['outstream'] = open(value, 'w') | |
else: | |
print_usage(sys.stderr) | |
assert False, "unsupported option '%s'" % key | |
if len(args) != 4: | |
print_usage(sys.stderr) | |
assert False, "expected 4 arguments, %d provided" % len(args) | |
params['gff1'] = os.path.abspath(args[0]) | |
params['fa1'] = os.path.abspath(args[1]) | |
params['gff2'] = os.path.abspath(args[2]) | |
params['fa2'] = os.path.abspath(args[3]) | |
return params | |
def fasta_to_dict(filename): | |
""" | |
Input: filname of Fasta file | |
Returns a dictionary with sequence IDs as keys and sequence records as values | |
""" | |
fasta_lookup = {} | |
with open(filename, 'r') as fp: | |
for defline, seq in parse_fasta(fp): | |
record = Seq(defline, seq) | |
fasta_lookup[record.id] = record | |
return fasta_lookup | |
def cds_ranges(filename): | |
""" | |
Input: filename of a GFF3 file | |
Returns a dictionary with genomic ranges as keys and a list of mRNA IDs as values | |
""" | |
starts = {} | |
stops = {} | |
gff3 = open(filename, 'r') | |
for line in gff3: | |
if line == "" or line.startswith("#"): | |
continue | |
line = line.rstrip() | |
fields = line.split('\t') | |
matches = re.search("Parent=([^;]+)", fields[8]) | |
if fields[2] == "start_codon": | |
starts[matches.group(1)] = fields | |
if fields[2] == "stop_codon": | |
stops[matches.group(1)] = fields | |
ranges = {} | |
for mrnaid in starts.keys(): | |
start = starts[mrnaid] | |
stop = stops[mrnaid] | |
rng = "%s_%s-%s" % (start[0], start[3], stop[4]) | |
if start[6] == "-": | |
rng = "%s_%s-%s" % (start[0], stop[3], start[4]) | |
if rng not in ranges: | |
ranges[rng] = [] | |
ranges[rng].append(mrnaid) | |
return ranges | |
def unmatched_pairs(list1, seqs1, list2, seqs2): | |
""" | |
Input: lists of IDs of CDSs from two samples (list1 and list2) and corresponding sequences (seqs1 and seqs2) | |
Returns a list of all pairs of CDSs from the two samples that do not have a match in the other sample | |
""" | |
matched1 = {} | |
matched2 = {} | |
for id1 in list1: | |
for id2 in list2: | |
seq1 = seqs1["cds."+ id1] | |
seq2 = seqs2["cds."+ id2] | |
if seq1.seq == seq2.seq: | |
matched1[id1] = 1 | |
matched2[id2] = 1 | |
unmatched1 = [] | |
for id1 in list1: | |
if id1 not in matched1: | |
unmatched1.append(id1) | |
unmatched2 = [] | |
for id2 in list2: | |
if id2 not in matched2: | |
unmatched2.append(id2) | |
pairs = [] | |
for id1 in unmatched1: | |
for id2 in unmatched2: | |
pairs.append(["cds."+ id1, "cds."+ id2]) | |
return pairs | |
if __name__ =='__main__': | |
params = parse_options(sys.argv) | |
seqs1 = fasta_to_dict(params['fa1']) | |
seqs2 = fasta_to_dict(params['fa2']) | |
ranges1 = cds_ranges(params['gff1']) | |
ranges2 = cds_ranges(params['gff2']) | |
for rng in ranges1: | |
if rng in ranges2: | |
list1 = ranges1[rng] | |
list2 = ranges2[rng] | |
pairs = unmatched_pairs(list1, seqs1, list2, seqs2) | |
for pair in pairs: | |
record1 = seqs1[pair[0]] | |
record2 = seqs2[pair[1]] | |
print "%s\t%s\t%s\n%s\n%s\n" % (rng, pair[0], pair[1], record1.seq, record2.seq) | |
params['outstream'].close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment