Skip to content

Instantly share code, notes, and snippets.

@endrebak
Last active December 20, 2019 11:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save endrebak/ec06a92519e84f669b143ec4b936f9d9 to your computer and use it in GitHub Desktop.
Save endrebak/ec06a92519e84f669b143ec4b936f9d9 to your computer and use it in GitHub Desktop.
def calc_diag_lean(self, out_fname, out_delim, dynamic_delete=True):
if dynamic_delete == False:
raise Exception('Error: Conversion has been run in lean mode, but with dynamically=False.')
self.dynamic_delete = dynamic_delete
flat.print_log_msg('Start')
# pre-read all relevant partitions at beginning!
last_p_num = -1
for p_num_init in range(0, len(self.partitions)-1):
if self.snp_first >= self.partitions[p_num_init+1][0]:
flat.print_log_msg('Pre-reading partition: '+str(self.partitions[p_num_init]))
flat.read_partition_into_matrix_lean(self.partitions, p_num_init, self.matrix,
self.locus_list, self.name, self.input_config,
self.snp_first, self.snp_last)
last_p_num = p_num_init
else:
break
curr_locus = -1
for p_num in range(last_p_num+1, len(self.partitions)):
p = self.partitions[p_num]
flat.print_log_msg('Reading partition: '+str(p))
flat.read_partition_into_matrix_lean(self.partitions, p_num, self.matrix,
self.locus_list, self.name, self.input_config,
self.snp_first, self.snp_last)
# Determine first locus
if curr_locus<0: # Either first partition or not found in first partition
# curr_locus = -1 # <- this should have been set to -1 before entering the main for loop
if len(self.locus_list)>0:
# Find first locus >= snp_first
for i, locus in enumerate(self.locus_list):
if locus >= self.snp_first:
curr_locus = locus
start_locus = locus
curr_locus_index = i
start_locus_index = i
break
else:
raise Exception('Error: locus_list seems to be empty')
else:
try:
curr_locus_index = self.locus_list.index(curr_locus)
# curr_locus is carried from prev iteration, but index has changed since part of matrix (and locus_list) has been deleted
except ValueError:
if len(self.locus_list)>0:
curr_locus = self.locus_list[0]
curr_locus_index = 0
else:
raise Exception('Error: locus_list seems to be empty')
if curr_locus<0:
flat.print_log_msg('Warning: curr_locus not found! Continuing to next partition.')
flat.print_log_msg('Comment: This is possibly due to snp_first being very close to end of partition.')
flat.print_log_msg('Details: ')
flat.print_log_msg('Partition: '+repr(p))
flat.print_log_msg('snp_first: '+repr(self.snp_first))
flat.print_log_msg('curr_locus: '+repr(curr_locus))
continue #continue to next partition
# Determine end locus
if p_num+1 < len(self.partitions):
end_locus = int((self.partitions[p_num][1] + self.partitions[p_num+1][0]) / 2) # diag - specific
else:
# Find last locus <= snp_last
end_locus_found = False
for i in reversed(range(0, len(self.locus_list))):
# for locus in reversed(locus_list):
if self.locus_list[i] <= self.snp_last:
end_locus = self.locus_list[i]
end_locus_index = i
end_locus_found = True
break
if not end_locus_found:
end_locus_index = 0
end_locus = self.locus_list[end_locus_index]
flat.print_log_msg('Running for partition: '+str(p))
# This will not include the very last SNP of the complete range, but that shouldn't be too important since the end of the range shouldn't be a defining location for LD
while curr_locus <= end_locus:
x = self.locus_list[curr_locus_index]
y = self.locus_list[curr_locus_index]
delta = 0
while x >= self.partitions[p_num][0] and y <= self.partitions[p_num][1]:
if x in self.matrix and y in self.matrix[x]:
corr_coeff = self.matrix[x][y] / math.sqrt( self.matrix[x][x] * self.matrix[y][y] )
self.add_corr_coeff(corr_coeff, curr_locus)
# Just save it in the matrix ;) - removed for chrom11
if delta!=0:
x = self.locus_list[curr_locus_index-delta+1]
if x in self.matrix and y in self.matrix[x]:
corr_coeff = self.matrix[x][y] / math.sqrt( self.matrix[x][x] * self.matrix[y][y] )
self.add_corr_coeff(corr_coeff, curr_locus)
# Just save it in the matrix ;) - removed for chrom11
delta += 1
if curr_locus_index-delta >= 0:
x = self.locus_list[curr_locus_index-delta]
else:
# flat.print_log_msg('X index out of bounds')
break
if curr_locus_index+delta < len(self.locus_list):
y = self.locus_list[curr_locus_index+delta]
else:
# flat.print_log_msg('Y index out of bounds')
break
if curr_locus_index+1 < len(self.locus_list):
curr_locus_index+=1
curr_locus = self.locus_list[curr_locus_index]
else:
flat.print_log_msg('curr_locus_index out of bounds')
break
if self.dynamic_delete:
flat.print_log_msg('Deleting loci not required any more')
if p_num+1 < len(self.partitions):
delete_loc = self.partitions[p_num+1][0]
else:
delete_loc = end_locus
flat.delete_loci_smaller_than_lean(delete_loc, self.matrix, self.locus_list,
self.locus_list_deleted, out_fname, self.vert_sum, out_delim)
else:
flat.print_log_msg('locus_list size: '+repr(len(self.locus_list)))
self.start_locus = start_locus
self.start_locus_index = start_locus_index
self.end_locus = end_locus
self.end_locus_index = end_locus_index
self.calculation_complete = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment