Skip to content

Instantly share code, notes, and snippets.

@mkyt
Last active August 29, 2015 14:01
Show Gist options
  • Save mkyt/08b0cb23b2e68475cc6f to your computer and use it in GitHub Desktop.
Save mkyt/08b0cb23b2e68475cc6f to your computer and use it in GitHub Desktop.
change indentation "\t" -> " "
#!/usr/bin/env python
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
from Bio.Blast.Applications import NcbiblastpCommandline
from Bio.Blast import NCBIXML
import sys
if sys.hexversion >= 0x3000000:
from io import StringIO
else:
from StringIO import StringIO
import tempfile
import os.path
class AlignResult(object):
def __init__(self, bits, length, s1_s, s1_e, s2_s, s2_e, s1, s2, m, ident, gaps, score, frame):
self.bits = bits
self.length = length
self.s1_range = (s1_s-1, s1_e) # convert to 0-origin, half-closed range
self.s2_range = (s2_s-1, s2_e)
self.s1 = s1
self.s2 = s2
self.m = m
self.ident = ident
self.gaps = gaps
self.score = score
self.strand = frame
self.contradictions = []
self.aligned = self._generate_aligned()
def __repr__(self):
range_stringify= lambda x: '{:d}..{:d}'.format(*x)
strand_stringify = lambda x: '/'.join('Plus' if e > 0 else 'Minus' for e in x)
return '<AlignResult [{}] <-> [{}] (score: {}, length: {}, ident: {}, gaps: {}, strand: {})>'.format(range_stringify(self.s1_range), range_stringify(self.s2_range), self.score, self.length, self.ident, self.gaps, strand_stringify(self.strand))
def _generate_aligned(self):
ERR = ('N', '-') # "cannot be read" or gap
res = Seq('')
s1_pos = self.s1_range[0]
s2_pos = self.s2_range[0]
for i in range(self.length):
c1 = self.s1[i]
c2 = self.s2[i]
if c1 in ERR:
res += c2
else:
res += c1
if c2 not in ERR and c1 != c2:
self.contradictions.append(((s1_pos, c1), (s2_pos, c2))) # two reads contradicts
s1_pos += self.strand[0]
s2_pos += self.strand[1]
return res
def align_two(s1, s2):
s1_f = os.path.join(tempfile.gettempdir(), 's1.fasta')
s2_f = os.path.join(tempfile.gettempdir(), 's2.fasta')
SeqIO.write(SeqRecord(s1), open(s1_f, 'w+'), 'fasta')
SeqIO.write(SeqRecord(s2), open(s2_f, 'w+'), 'fasta')
output = NcbiblastpCommandline(cmd='blastn', query=s1_f, subject=s2_f, outfmt=5)()[0]
blast_result_record = NCBIXML.read(StringIO(output))
try:
hsp = blast_result_record.alignments[0].hsps[0]
except IndexError:
# blast returned no result
return None
return AlignResult(hsp.bits, hsp.align_length, hsp.query_start, hsp.query_end, hsp.sbjct_start, hsp.sbjct_end, hsp.query, hsp.sbjct, hsp.match, hsp.identities, hsp.gaps, hsp.score, hsp.frame)
def evalQuality(s):
rate = 1.0
res = 0
for i in range(len(s)):
if s[i] == 'N':
rate *= 0.9
else:
res += rate
return res
def concat_two(s1, s2, alignRes):
s1_s, s1_e = alignRes.s1_range
s1_after = len(s1) - s1_e # length after aligned region
s2_s, s2_e = alignRes.s2_range
res = Seq('')
rev = alignRes.strand[1] < 0
src = []
if rev:
s2 = s2.complement()
s2_s, s2_e = s2_e-1, s2_s+1
ss1_before = evalQuality(s1[s1_s-1::-1]) if s1_s > 0 else 0 # s1:(s-1)..0
ss1_after = evalQuality(s1[s1_e:]) # s1:e..last
ss2_before = evalQuality(s2[s2_s-1::-1]) if s2_s > 0 else 0 # s2:(s-1)..0
ss2_after = evalQuality(s2[s2_e:]) #s2:e..last
if rev:
ss2_before, ss2_after = ss2_after, ss2_before
print('scores:{}:{} {}:{}'.format(ss1_before, ss1_after, ss2_before, ss2_after))
beg = ss1_before > ss2_before # s1 is better at the beginning
ed = ss1_after > ss2_after # s1 is better at the end
if beg and ed:
# s1 contains s2
res += s1[:]
src.append(('s1', len(res)))
elif not beg and not ed:
# s2 contains s1
res += s2[::-1 if rev else 1]
src.append(('s2', len(res)))
elif beg and not ed:
# s1 -> s2
res += s1[:s1_s] # s1:0..(s-1)
src.append(('s1', len(res)))
res += alignRes.aligned # s1:s..(e-1) == s2:s..(e-1)
src.append(('aligned', len(res)))
if not rev:
res += s2[s2_e:] # s2:e..last
else:
if s2_s > 0:
res += s2[s2_s-1::-1] # s2:(s-1)..0
src.append(('s2', len(res)))
else:
# s2 -> s1
if not rev:
res += s2[:s2_s] # s2:0..(s-1)
else:
res += s2[:s2_e-1:-1] # s2:last..e
src.append(('s2', len(res)))
res += alignRes.aligned # s1:s..(e-1) == s2:s..(e-1) or s2:(e-1)..s
src.append(('aligned', len(res)))
res += s1[s1_e:] # s1:e..last
src.append(('s1', len(res)))
print(src)
return res, src
def concatenate(seqs):
left = len(seqs)
consumed = set()
res = dict()
while left > 1:
mx_score, mx_pair = -1, (-1, -1)
l = len(seqs)
for i in range(l):
if i in consumed:
continue
for j in range(i+1, l):
if j in consumed:
continue
if (i, j) not in res:
res[(i, j)] = align_two(seqs[i], seqs[j])
if res[(i, j)] is not None and res[(i, j)].score > mx_score:
mx_score = res[(i, j)].score
mx_pair = (i, j)
if mx_score < 0:
print("alignment failed before consuming all sequences supplied")
break
m1, m2 = mx_pair
print('merging seq #{:d} and #{:d} (score {}) to obtain seq #{:d}'.format(m1, m2, mx_score, len(seqs)))
print(res[mx_pair])
new_seq, src = concat_two(seqs[m1], seqs[m2], res[mx_pair])
consumed.add(m1)
consumed.add(m2)
print(new_seq)
contra = res[mx_pair].contradictions
# TODO: notify users of contradictions
seqs.append(new_seq)
left -= 1
return seqs[-1]
def findORFs(seq):
table = 1
min_pro_len = 200
for strand, nuc in [(+1, seq), (-1, seq.reverse_complement())]:
for frame in range(3):
print(nuc[frame:].translate(table))
for pro in nuc[frame:].translate(table).split("*"):
if len(pro) >= min_pro_len:
print("%s...%s - length %i, strand %i, frame %i" % (pro[:30], pro[-3:], len(pro), strand, frame))
print()
print(pro)
def main(progname, args):
if len(args) < 3:
print('usage: {} [INPUT_FILE 1] [INPUT_FILE 2]...'.format(progname))
return 1
seqs = [next(SeqIO.parse(f, 'fasta')).seq for f in args]
res = concatenate(seqs)
print(res)
res2 = res.reverse_complement()
print(res2)
#findORFs(res2)
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv[0], sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment