Skip to content

Instantly share code, notes, and snippets.

@flashton2003
Created June 20, 2018 08:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save flashton2003/b6c3e4e31e9084220fd30188988808f5 to your computer and use it in GitHub Desktop.
Save flashton2003/b6c3e4e31e9084220fd30188988808f5 to your computer and use it in GitHub Desktop.
from __future__ import division
from Bio import SeqIO
import pprint
import glob
import os
import sys
import itertools
class Contigs():
def __init__(self, contig_handle):
self.contig_handle = contig_handle
self.colibs = {}
self.read_in_contigs()
self.calc_contig_range()
def read_in_contigs(self):
self.contig_seq = {}
self.contig_order = []
for contig in SeqIO.parse(self.contig_handle, 'fasta'):
self.contig_order.append(contig.id)
self.contig_seq[contig.id] = str(contig.seq)
def calc_contig_range(self):
i = 0
self.contig_range = {}
for contig in self.contig_order:
self.contig_range[contig] = [i, i+len(self.contig_seq[contig])]
z = i+len(self.contig_seq[contig])
i += len(self.contig_seq[contig])
total_assembly_length = sum([len(self.contig_seq[x]) for x in self.contig_seq])
assert total_assembly_length == z
class CoLiB():
"""docstring for CoLiB"""
def __init__(self, match_1, match_2, colib_id):
self.colib_id = colib_id
self.match_1_cons_pos = [match_1[1], match_1[2]]
self.match_2_cons_pos = [match_2[1], match_2[2]]
self.match_1_strand = match_1[4]
self.match_2_strand = match_2[4]
self.match_1_name = None
self.match_2_name = None
## set these to false for all the singleton contig/colibs which dont get checked for upstream/downstream
self.match_1_upstream = False
self.match_2_upstream = False
self.match_1_downstream = False
self.match_2_downstream = False
# self.match_1_chromo_pos = []
# self.match_2_chromo_pos = []
def calc_intersection(self, list1, list2):
## do it this way because looking at intersection of ranges takes ages.
## it returns a negative value if there is no intersection
d1 = 0
d2 = 0
if max(list2) - max(list1) < 0:
d1 = abs(max(list2) - max(list1))
if min(list2) - min(list1) < 0:
d2 = abs(min(list2) - min(list1))
return (max(list1) - min(list2)) - (d1 + d2)
def assign_to_contig(self, contigs_1, contigs_2):
for contig in contigs_1.contig_range:
## calculate the intersection between each contig and each colib
inter = self.calc_intersection(self.match_1_cons_pos, contigs_1.contig_range[contig])
## if the intersection accounts for either 2% of the contig length or 1000bp, set the colib match name as the contig
## also, add the colib to the contigs.colib dictionary which is like
## this {'contig4':[colib1, colib2]}
if inter / (contigs_1.contig_range[contig][1] - contigs_1.contig_range[contig][0]) > 0.02 or inter > 1000:
#print contig, inter, (contigs_1.contig_range[contig][1] - contigs_1.contig_range[contig][0])
self.match_1_name = contig
if contig in contigs_1.colibs:
contigs_1.colibs[contig].append(self)
else:
contigs_1.colibs[contig] = []
contigs_1.colibs[contig].append(self)
for contig in contigs_2.contig_range:
inter = self.calc_intersection(self.match_2_cons_pos, contigs_2.contig_range[contig])
if inter / (contigs_2.contig_range[contig][1] - contigs_2.contig_range[contig][0]) > 0.02 or inter > 1000:
#print contig, inter, (contigs_2.contig_range[contig][1] - contigs_2.contig_range[contig][0])
self.match_2_name = contig
if contig in contigs_2.colibs:
contigs_2.colibs[contig].append(self)
else:
contigs_2.colibs[contig] = []
contigs_2.colibs[contig].append(self)
def usage():
print '''Usage is:
python infer_rearrangements.py assembly_1.fa assembly_2.fa /path/to/output_dir
Assemblies should end .fa'''
sys.exit()
def get_args():
if len(sys.argv) != 4:
usage()
else:
return sys.argv[1], sys.argv[2], sys.argv[3]
def read_in_xmfa(xmfa_handle):
all_results = []
with open(xmfa_handle) as fi:
lines = fi.readlines()
lines = [x.strip() for x in lines]
lines = [x for x in lines if x.startswith('> ')]
lines = [x.lstrip('> ') for x in lines]
while len(lines) > 1:
first = lines.pop(0).replace(':', ' ').split(' ')
first = [int(first[0]), int(first[1].split('-')[0]), int(first[1].split('-')[1]), first[-1], first[2]]
# print first
second = lines.pop(0).replace(':', ' ').split(' ')
second = [int(second[0]), int(second[1].split('-')[0]), int(second[1].split('-')[1]), second[-1], second[2]]
all_results.append([first, second])
paired_colibs = []
## colib is colinear block
i = 1
for colib in all_results:
if colib[0][0] == 1 and colib[1][0] == 2:
# print colib
clb = CoLiB(colib[0], colib[1], i)
i += 1
paired_colibs.append(clb)
return paired_colibs
def get_up_downstream_colib(contigs_1, contigs_2, contig_pair):
'''
0. for every colib in contigs_1.colibs[contig]
1. is colib.match1 the other co-lib up or downstream of it?
2.
for colib in contigs1.colibs[contig]:
what is upstream and downstream of the colib?
what is upstream and downstream of the same colib on contigs_2.colibs[colib.match_2_name]?
taking strand differences into account, are there different colibs either upstream or downstream of the original colib in the two strains?
'''
## i think this way of doing the match 1 by iterating through contigs 1 and
## match 2 by iterating through contigs 2 works ok
## default of all upstream/downstream is false, otherwith all the up/downstream for contigs with only one clb is None and the logic breaks
for i, colib in enumerate(contigs_1.colibs[contig_pair[0]]):
## if its the first one
if i == 0:
## then upstream must be false (default)
## downstream is the colib one downstream of this one
## contig_pair[0] is the contig this strain is on
## contigs_1.colibs[contig_pair[0]] is the list of colibs for this contig
## i + 1 is the one downstream of this
colib.match_1_downstream = contigs_1.colibs[contig_pair[0]][i + 1].colib_id
elif i + 1 == len(contigs_1.colibs[contig_pair[0]]):
colib.match_1_upstream = contigs_1.colibs[contig_pair[0]][i - 1].colib_id
else:
colib.match_1_upstream = contigs_1.colibs[contig_pair[0]][i - 1].colib_id
colib.match_1_downstream = contigs_1.colibs[contig_pair[0]][i + 1].colib_id
for i, colib in enumerate(contigs_2.colibs[contig_pair[1]]):
if i == 0:
colib.match_2_downstream = contigs_2.colibs[contig_pair[1]][i + 1].colib_id
elif i + 1 == len(contigs_2.colibs[contig_pair[1]]):
colib.match_2_upstream = contigs_2.colibs[contig_pair[1]][i - 1].colib_id
# if len(contigs_2.colibs[contig_pair[1]]) == 2:
else:
colib.match_2_upstream = contigs_2.colibs[contig_pair[1]][i - 1].colib_id
colib.match_2_downstream = contigs_2.colibs[contig_pair[1]][i + 1].colib_id
def call_rearrangments(contigs_1, contigs_2, potentially_rearranged_contigs, pair):
## just doing this for contigs 1 is equivalent to doing it for both because
## the rearrangements are called in a pairwise fashion
contigs_with_rearrangements = []
for contig_pair in potentially_rearranged_contigs:
rearrangment = False
for colib in contigs_1.colibs[contig_pair[0]]:
## the strand matters, if same strain, then up should == up if no re-arrangment
if colib.match_1_strand == colib.match_2_strand:
## if down != down
if colib.match_1_downstream != colib.match_2_downstream:
## and if they are both not False (if one is false, means that there is no clb, so dont know about re-arrangment)
if False not in (colib.match_1_downstream, colib.match_2_downstream):
## then rearranement is true
rearrangment = True
contigs_with_rearrangements.append(contig_pair[0])
if colib.match_1_upstream != colib.match_2_upstream:
if False not in (colib.match_1_upstream, colib.match_2_upstream):
rearrangment = True
contigs_with_rearrangements.append(contig_pair[0])
## if opposite strands, up should == down if no re-arrangement
if colib.match_1_strand != colib.match_2_strand:
if colib.match_1_downstream != colib.match_2_upstream:
if False not in (colib.match_1_downstream, colib.match_2_upstream):
rearrangment = True
contigs_with_rearrangements.append(contig_pair[0])
if colib.match_1_downstream != colib.match_2_upstream:
if False not in (colib.match_1_downstream, colib.match_2_upstream):
rearrangment = True
contigs_with_rearrangements.append(contig_pair[0])
print '\t'.join(map(str, [pair[0], pair[1], contig_pair[0], contig_pair[1], rearrangment]))
# print
# print colib, 'colib'
# pprint.pprint(vars(colib))
# for second_colib in contigs_2.colibs[contig_pair[1]]:
# if second_colib.colib_id == colib.colib_id:
# print second_colib, 'second_colib'
# pprint.pprint(vars(second_colib))
## TODO - check that this number is correct
print 'There are %s contigs with re-arrangements between %s' % (len(set(contigs_with_rearrangements)), pair)
def sort_colibs_within_contigs(contigs_1, contigs_2):
## sort all the colibs for both contig1 and colib.match_2_name
## even though the same instance of the colib class can be in >1 list
## not a problem, as just sorting the list, not changing the class instance itself.
## match_1_cons_pos[0]/match_2_cons_pos[0] is the start of each colib
for contig in contigs_1.colibs:
contigs_1.colibs[contig] = sorted(contigs_1.colibs[contig], key = lambda x:x.match_1_cons_pos[0])
for contig in contigs_2.colibs:
contigs_2.colibs[contig] = sorted(contigs_2.colibs[contig], key = lambda x:x.match_2_cons_pos[0])
return contigs_1, contigs_2
def identify_potential_rearrangements(contigs_1, contigs_2):
## contigs_1.colibs = {contig1:[list, of, colibs]}
potentially_rearranged_contigs = []
for contig in contigs_1.colibs:
# if contig == 'contig19':
## does the contig have more than one colib?
if len(contigs_1.colibs[contig]) > 1:
# tmp = [x for x in contigs_1.colibs[contig] if x.match_1_name != contig]
# print contig, len(contigs_1.colibs[contig]), len(tmp)
## contigs_1.colibs[contig] is a list of all the colibs for that contig
## for each colib in that contig
for colib in contigs_1.colibs[contig]:
## i dont know why colib.match_2_name can still be none here...
if colib.match_2_name == None:
continue
## if the contig which is matched to contigs_1 has more than one colib
if len(contigs_2.colibs[colib.match_2_name]) > 1:
## print it
# print contig, colib.match_2_name
potentially_rearranged_contigs.append([contig, colib.match_2_name])
# check_colibs_within_contigs(contigs_1, contigs_2, contig)
# pprint.pprint(potentially_rearranged_contigs)
return potentially_rearranged_contigs
def run_find_rearrangements(contig_1_handle, contig_2_handle, xmfa_handle, pair):
## this creates e.g. contigs_1.contig_range which = {contig1:[1,300000], ...}
contigs_1 = Contigs(contig_1_handle)
contigs_2 = Contigs(contig_2_handle)
## this outputs a list of co-linear blocks which are present in both samples
paired_colibs = read_in_xmfa(xmfa_handle)
# pprint.pprint(contigs_1.contig_range)
for colib in paired_colibs:
## for each colib, assign it to a contig in each assembly
colib.assign_to_contig(contigs_1, contigs_2)
# print contig_1_handle, contig_2_handle
## sort the colibs by the first position in the colib. not sure that its necassary, but good to double check
contigs_1, contigs_2 = sort_colibs_within_contigs(contigs_1, contigs_2)
## find pairs of contigs which are linked by a colib, where both contigs have more than one colib
potentially_rearranged_contigs = identify_potential_rearrangements(contigs_1, contigs_2)
for contig_pair in potentially_rearranged_contigs:
## for each colib, figure out what is upstream and downstream of it
get_up_downstream_colib(contigs_1, contigs_2, contig_pair)
## check that for each colib, whether the upstream/downstream clbs are consistent with a rearrangement or not.
call_rearrangments(contigs_1, contigs_2, potentially_rearranged_contigs, pair)
def get_all_pairs(todo_list):
return list(itertools.combinations(todo_list, 2))
def run_mauve(contig_1_handle, contig_2_handle, output_dir):
sample_name_1 = contig_1_handle.split('/')[-1].rstrip('.fa')
sample_name_2 = contig_2_handle.split('/')[-1].rstrip('.fa')
#os.system('progressiveMauve --output {0}/st93_{1}_vs_{2}.xmfa {3} {4}'.format(output_dir, sample_name_1, sample_name_2, contig_1_handle, contig_2_handle))
return '{0}/{1}_vs_{2}.xmfa'.format(output_dir, sample_name_1, sample_name_2)
def main():
contig_1_handle, contig_2_handle, output_dir = get_args()
xmfa_handle = run_mauve(contig_1_handle, contig_2_handle, output_dir)
run_find_rearrangements(contig_1_handle, contig_2_handle, xmfa_handle, pair)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment