Last active
December 20, 2019 11:01
-
-
Save endrebak/ec06a92519e84f669b143ec4b936f9d9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def calc_diag_lean(self, out_fname, out_delim, dynamic_delete=True): | |
if dynamic_delete == False: | |
raise Exception('Error: Conversion has been run in lean mode, but with dynamically=False.') | |
self.dynamic_delete = dynamic_delete | |
flat.print_log_msg('Start') | |
# pre-read all relevant partitions at beginning! | |
last_p_num = -1 | |
for p_num_init in range(0, len(self.partitions)-1): | |
if self.snp_first >= self.partitions[p_num_init+1][0]: | |
flat.print_log_msg('Pre-reading partition: '+str(self.partitions[p_num_init])) | |
flat.read_partition_into_matrix_lean(self.partitions, p_num_init, self.matrix, | |
self.locus_list, self.name, self.input_config, | |
self.snp_first, self.snp_last) | |
last_p_num = p_num_init | |
else: | |
break | |
curr_locus = -1 | |
for p_num in range(last_p_num+1, len(self.partitions)): | |
p = self.partitions[p_num] | |
flat.print_log_msg('Reading partition: '+str(p)) | |
flat.read_partition_into_matrix_lean(self.partitions, p_num, self.matrix, | |
self.locus_list, self.name, self.input_config, | |
self.snp_first, self.snp_last) | |
# Determine first locus | |
if curr_locus<0: # Either first partition or not found in first partition | |
# curr_locus = -1 # <- this should have been set to -1 before entering the main for loop | |
if len(self.locus_list)>0: | |
# Find first locus >= snp_first | |
for i, locus in enumerate(self.locus_list): | |
if locus >= self.snp_first: | |
curr_locus = locus | |
start_locus = locus | |
curr_locus_index = i | |
start_locus_index = i | |
break | |
else: | |
raise Exception('Error: locus_list seems to be empty') | |
else: | |
try: | |
curr_locus_index = self.locus_list.index(curr_locus) | |
# curr_locus is carried from prev iteration, but index has changed since part of matrix (and locus_list) has been deleted | |
except ValueError: | |
if len(self.locus_list)>0: | |
curr_locus = self.locus_list[0] | |
curr_locus_index = 0 | |
else: | |
raise Exception('Error: locus_list seems to be empty') | |
if curr_locus<0: | |
flat.print_log_msg('Warning: curr_locus not found! Continuing to next partition.') | |
flat.print_log_msg('Comment: This is possibly due to snp_first being very close to end of partition.') | |
flat.print_log_msg('Details: ') | |
flat.print_log_msg('Partition: '+repr(p)) | |
flat.print_log_msg('snp_first: '+repr(self.snp_first)) | |
flat.print_log_msg('curr_locus: '+repr(curr_locus)) | |
continue #continue to next partition | |
# Determine end locus | |
if p_num+1 < len(self.partitions): | |
end_locus = int((self.partitions[p_num][1] + self.partitions[p_num+1][0]) / 2) # diag - specific | |
else: | |
# Find last locus <= snp_last | |
end_locus_found = False | |
for i in reversed(range(0, len(self.locus_list))): | |
# for locus in reversed(locus_list): | |
if self.locus_list[i] <= self.snp_last: | |
end_locus = self.locus_list[i] | |
end_locus_index = i | |
end_locus_found = True | |
break | |
if not end_locus_found: | |
end_locus_index = 0 | |
end_locus = self.locus_list[end_locus_index] | |
flat.print_log_msg('Running for partition: '+str(p)) | |
# This will not include the very last SNP of the complete range, but that shouldn't be too important since the end of the range shouldn't be a defining location for LD | |
while curr_locus <= end_locus: | |
x = self.locus_list[curr_locus_index] | |
y = self.locus_list[curr_locus_index] | |
delta = 0 | |
while x >= self.partitions[p_num][0] and y <= self.partitions[p_num][1]: | |
if x in self.matrix and y in self.matrix[x]: | |
corr_coeff = self.matrix[x][y] / math.sqrt( self.matrix[x][x] * self.matrix[y][y] ) | |
self.add_corr_coeff(corr_coeff, curr_locus) | |
# Just save it in the matrix ;) - removed for chrom11 | |
if delta!=0: | |
x = self.locus_list[curr_locus_index-delta+1] | |
if x in self.matrix and y in self.matrix[x]: | |
corr_coeff = self.matrix[x][y] / math.sqrt( self.matrix[x][x] * self.matrix[y][y] ) | |
self.add_corr_coeff(corr_coeff, curr_locus) | |
# Just save it in the matrix ;) - removed for chrom11 | |
delta += 1 | |
if curr_locus_index-delta >= 0: | |
x = self.locus_list[curr_locus_index-delta] | |
else: | |
# flat.print_log_msg('X index out of bounds') | |
break | |
if curr_locus_index+delta < len(self.locus_list): | |
y = self.locus_list[curr_locus_index+delta] | |
else: | |
# flat.print_log_msg('Y index out of bounds') | |
break | |
if curr_locus_index+1 < len(self.locus_list): | |
curr_locus_index+=1 | |
curr_locus = self.locus_list[curr_locus_index] | |
else: | |
flat.print_log_msg('curr_locus_index out of bounds') | |
break | |
if self.dynamic_delete: | |
flat.print_log_msg('Deleting loci not required any more') | |
if p_num+1 < len(self.partitions): | |
delete_loc = self.partitions[p_num+1][0] | |
else: | |
delete_loc = end_locus | |
flat.delete_loci_smaller_than_lean(delete_loc, self.matrix, self.locus_list, | |
self.locus_list_deleted, out_fname, self.vert_sum, out_delim) | |
else: | |
flat.print_log_msg('locus_list size: '+repr(len(self.locus_list))) | |
self.start_locus = start_locus | |
self.start_locus_index = start_locus_index | |
self.end_locus = end_locus | |
self.end_locus_index = end_locus_index | |
self.calculation_complete = True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment