Last active
July 30, 2019 04:18
-
-
Save meren/052e283343030e2a282702bcb7894253 to your computer and use it in GitHub Desktop.
Poor man's redundancy removal scripts. Until we incorporate this logic into anvi'o.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# you run this first: | |
# | |
# python 01-gen-nucmer-jobs.py affiliations.txt MAGs_input_dir | |
# | |
import os | |
import sys | |
import anvio.fastalib as u | |
import anvio.utils as utils | |
affiliations_input = utils.get_TAB_delimited_file_as_dictionary(sys.argv[1], no_header=True, column_names=['item', 'affiliation']) | |
affiliations_dict = {} | |
for mag in affiliations_input: | |
affiliation = affiliations_input[mag]['affiliation'] | |
if not affiliation in affiliations_dict: | |
affiliations_dict[affiliation] = [mag] | |
else: | |
affiliations_dict[affiliation].append(mag) | |
for affiliation in affiliations_dict: | |
db_path = '%s.fa' % affiliation | |
output = open(db_path, 'w') | |
for mag in affiliations_dict[affiliation]: | |
path = '%s/%s.fa' % (sys.argv[2], mag) | |
fasta = u.SequenceSource(path) | |
seq = '' | |
while next(fasta): | |
seq += fasta.seq | |
output.write('>%s\n' % mag) | |
output.write('%s\n' % seq) | |
print(('nucmer --minmatch 500 --prefix %s %s %s' % (os.path.join('hits', mag), path, db_path))) | |
output.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# you run this next: | |
# | |
# python 02-estimate-similarity.py mag_lengths.txt output_percent_identities.txt | |
# | |
import sys | |
import glob | |
import numpy | |
import anvio.utils as utils | |
def get_matching_stats(query_genome_name, delta_file_path, genome_lengths, program="show-coords"): | |
utils.is_program_exists(program) | |
command = '%s -H -T %s' % (program, delta_file_path) | |
output, ret_val = utils.get_command_output_from_shell(command) | |
output = output.decode("utf-8") | |
matches = {} | |
summary = [] | |
for line in [line for line in output.split('\n')]: | |
if not line: | |
continue | |
fields = line.split('\t') | |
if len(fields) != 9: | |
raise | |
alignment_start_target = int(fields[0]) | |
alignment_end_target = int(fields[1]) | |
alignment_start_query = int(fields[2]) | |
alignment_end_query = int(fields[3]) | |
alignment_length_target = int(fields[4]) | |
alignment_length_query = int(fields[5]) | |
percent_identity = float(fields[6]) | |
query = str(fields[7]) | |
target = str(fields[8]) | |
if query_genome_name == target: | |
continue | |
alignment_length = min(alignment_length_query, alignment_length_target) | |
if target not in matches: | |
matches[target] = [(alignment_length, percent_identity)] | |
else: | |
matches[target].append((alignment_length, percent_identity), ) | |
query_genome_length = genome_lengths[query_genome_name]['total_length'] | |
for target_genome_name in matches: | |
target_genome_length = genome_lengths[target_genome_name]['total_length'] | |
# this is tricky: | |
m = [] | |
list(map(lambda n: m.extend([n[1]] * n[0]), matches[target_genome_name])) | |
# we are thankful to Naíla for fixing the line above) | |
matching_nucleotides = len(m) | |
average_identity = numpy.mean(m) | |
summary.append({'query_genome_name': query_genome_name, | |
'query_genome_length': query_genome_length, | |
'target_genome_name': target_genome_name, | |
'target_genome_length': target_genome_length, | |
'matching_nucleotides': matching_nucleotides, | |
'average_identity': average_identity}) | |
return summary | |
def main(): | |
delta_file_paths = glob.glob('hits/*.delta') | |
lengths = utils.get_TAB_delimited_file_as_dictionary(sys.argv[1], column_mapping=[str, int]) | |
output = open(sys.argv[2], 'w') | |
output.write('%s\n' % '\t'.join(['query_genome_name', 'query_genome_length', 'target_genome_name', 'target_genome_length', 'matching_nucleotides', 'average_identity'])) | |
no_redundancy_info_recovered = True | |
for delta_file_path in delta_file_paths: | |
genome_name = '.'.join(delta_file_path.split('/')[-1].split('.')[:-1]) | |
summary = get_matching_stats(genome_name, delta_file_path, lengths) | |
if len(summary): | |
no_redundancy_info_recovered = False | |
for e in summary: | |
output.write("%(query_genome_name)s\t%(query_genome_length)d\t%(target_genome_name)s\t%(target_genome_length)d\t%(matching_nucleotides)d\t%(average_identity)f\n" % e) | |
output.close() | |
if no_redundancy_info_recovered: | |
print("None of your MAGs were redundant :/ So the output file '%s' is going to be empty (and irrelevant for the next steps of your analysis)" % sys.argv[2]) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 | |
import sys | |
import redundancy as r | |
from anvio.errors import ConfigError, FilesNPathsError | |
__author__ = "Alon Shaiber" | |
__copyright__ = "Copyright 2017, The anvio Project" | |
__credits__ = [] | |
__license__ = "GPL 3.0" | |
__version__ = [] | |
__maintainer__ = "Alon Shaiber" | |
__email__ = "alon.shaiber@gmail.com" | |
__status__ = "Development" | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description='A short helper script to generate a table that is ready for redundancy analysis (the next step is export-non-redundant-genomes.py)') | |
group0 = parser.add_argument_group('ESSENTIAL INPUTS', "You must supply the following TAB-delimited files.") | |
group1 = parser.add_argument_group('ESSENTIAL OUTPUT', "You must supply a name for the output file.") | |
group0.add_argument('-m', '--metadata', metavar='PATH', default=None, help='A TAB-delimited file with metadata regarding each genome. It has to contain the following columns: name, length, completion, redundancy, domain.') | |
group0.add_argument('-C', '--correlation', metavar='PATH', default=None, dest='correlation', help='A TAB-delimited file containing the correlation information between the occurance of pairs of genomes. It must contain the following columns: key, item_1, item_2, correlation.') | |
group0.add_argument('-i', '--input', metavar='PATH', default=None, dest='matching', help='A TAB-delimited file with alignment information for pairs of genomes (for example an output file from Nucmer). The file has to include the following columns: query_genome_name, query_genome_length, target_genome_name, target_genome_length, matching_nucleotides, average_nucleotide_identity.') | |
group1.add_argument('-o','--output', metavar='PATH', default=None, help='Name of output file for the data table') | |
args = parser.parse_args() | |
A = lambda x: args.__dict__[x] if x in args.__dict__ else None | |
try: | |
d = r.Redundancy(args) | |
d.store_data_as_TAB_delimited_file(args.output) | |
except ConfigError as e: | |
print(e) | |
sys.exit(-1) | |
except FilesNPathsError as e: | |
print(e) | |
sys.exit(-2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 | |
import sys | |
import redundancy as r | |
from anvio.errors import ConfigError, FilesNPathsError | |
__author__ = "Alon Shaiber" | |
__copyright__ = "Copyright 2017, The anvio Project" | |
__credits__ = [] | |
__license__ = "GPL 3.0" | |
__version__ = [] | |
__maintainer__ = "Alon Shaiber" | |
__email__ = "alon.shaiber@gmail.com" | |
__status__ = "Development" | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description='A script to find redundant genomes.') | |
group0 = parser.add_argument_group('ESSENTIAL INPUTS', "You must supply the following TAB-delimited files.") | |
group1 = parser.add_argument_group('ESSENTIAL OUTPUT', "You must supply a name for the output file.") | |
group2 = parser.add_argument_group('ADDITIONAL ARGUMENTS', "You can set the different thresholds for determining redundancy.") | |
group0.add_argument('-m', '--metadata', metavar='PATH', default=None, help='A TAB-delimited file with metadata regarding each genome. It has to contain the following columns: name, length, completion, redundancy, substantive_q, domain.') | |
group0.add_argument('-d', '--data', metavar='PATH', default=None, dest='data', help='A TAB-delimited file containing all the following columns: item_1, item_2, correlation, query_genome_length, target_genome_length, average_nucleotide_identity, percent_alignment, matching_nucleotides, item_1_substantive_q, item_2_substantive_q, domain.') | |
group1.add_argument('-o', '--output-file', metavar='PATH', default=None, help='File path to store results.') | |
group2.add_argument('-pa', '--percent-alignment-threshold', metavar='NUM', type=float, default=75, help='Cut-off value for percent alignment. Percent alignment is the ratio between the alignment length and the shorter genome. The default is 75.') | |
group2.add_argument('-ani', '--average-nucleotide-identity', metavar='NUM', type=float, default=99, help='Cut-off value for average nucleotide identity. The default is 99.') | |
group2.add_argument('-pc', '--correlation-threshold', metavar='NUM', type=float, default=0.9, help='Cut-off value for correlation. The default is 0.9.') | |
args = parser.parse_args() | |
A = lambda x: args.__dict__[x] if x in args.__dict__ else None | |
try: | |
d = r.Redundancy(args) | |
d.resolve_redundancy() | |
d.store_redundancy_output(A('output_file')) | |
except ConfigError as e: | |
print(e) | |
sys.exit(-1) | |
except FilesNPathsError as e: | |
print(e) | |
sys.exit(-2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 | |
# pylint: disable=line-too-long | |
"""functions to deal with redundancy in a collection of genomes (be them s, draft or full genomes) """ | |
import csv | |
from anvio import utils as u | |
from anvio.errors import ConfigError | |
__author__ = "Alon Shaiber" | |
__copyright__ = "Copyright 2017, The anvio Project" | |
__credits__ = [] | |
__license__ = "GPL 3.0" | |
__version__ = [] | |
__maintainer__ = "Alon Shaiber" | |
__email__ = "alon.shaiber@gmail.com" | |
__status__ = "Development" | |
# script to calculate the sum of an arithmetic progression series | |
# Since there is an inconsistency in naming the columns, the following key names might have to be fixed: | |
keys_to_fix = {'query_genome_name': 'item_1' | |
, 'query_genome_length': 'item_1_length' | |
, 'target_genome_name': 'item_2' | |
, 'target_genome_length': 'item_2_length' | |
, 'MAG_1': 'item_1' | |
, 'MAG_2': 'item_2' | |
, 'MAG_Id': 'item_ID' | |
, 'domain_tod': 'domain' | |
} | |
def fix_keys(dictionary): | |
for key in (key for key in list(dictionary.keys()) if key in keys_to_fix.keys()): | |
dictionary[keys_to_fix[key]] = dictionary[key] | |
del dictionary[key] | |
return dictionary | |
class pair: | |
""" | |
A class to deal with pairs of genomes | |
has the following attributes: | |
item_1, item_2, correlation, query_genome_length, target_genome_length, matching_nucleotides, percent_alignment, average_nucleotide_identity, item_1_substantive_q, item_2_substantive_q, domain | |
""" | |
def __init__(self, unique_id=0, dictionary=None): | |
if dictionary is None: | |
self.dictionary = {} | |
else: | |
self.dictionary = fix_keys(dictionary) | |
list_of_keys = ['item_1', 'item_2', 'correlation', 'item_1_length', 'item_2_length', 'matching_nucleotides', 'percent_alignment', 'average_nucleotide_identity', 'item_1_substantive_q', 'item_2_substantive_q', 'domain'] | |
self.unique_id = unique_id | |
# create all the keys of the dictionary | |
for key in list_of_keys: | |
if key not in self.dictionary.keys(): | |
# Creating the keys that are not already included | |
self.dictionary[key] = None | |
def update_attribute(self, key, value): | |
self.dictionary[key] = value | |
def update_pair_from_dictionary(self, d): | |
# update only the attributes that are in the | |
for key in d.keys(): # TODO: check that these keys exist in dictionary.keys() | |
self.dictionary[key] = d[key] | |
def update_metadata_for_genome(self, genome, genome_name, genome_position): | |
self.update_attribute(genome_position, genome_name) | |
self.update_attribute(genome_position + '_length', genome['length']) | |
self.update_attribute(genome_position + '_substantive_q', float(genome['completion']) - float(genome['redundancy'])) # TODO: add weights x,y to x * Completion - y * redundancy | |
def update_metadata(self, query_genome, target_genome, metadata): | |
query_genome_dict = fix_keys(metadata[query_genome]) | |
target_genome_dict = fix_keys(metadata[target_genome]) | |
# checking that both are from the same domain | |
if query_genome_dict['domain'] != target_genome_dict['domain']: | |
raise ConfigError("domain of %s is %s and is not the same of domain of %s, which is %s, yet they were compared in %s" % (query_genome, target_genome, query_genome_dict['domain'], target_genome_dict['domain'], self)) | |
self.update_metadata_for_genome(query_genome_dict, query_genome, 'item_1') | |
self.update_metadata_for_genome(target_genome_dict, target_genome, 'item_2') | |
self.update_attribute('domain', query_genome_dict['domain']) | |
def test_args(): | |
d = {} | |
d['output-file'] = 'test-out.txt' | |
d['metadata'] = 'test-metadata.txt' | |
d['correlation'] = 'test-correlation.txt' | |
d['matching'] = 'test-matching.txt' | |
d['data_output'] = 'test-data.txt' | |
return d | |
class Redundancy: | |
def __init__(self, args=None): | |
if args is None: | |
args_dict = test_args() | |
A = lambda x: args_dict[x] if x in args_dict.keys() else None | |
else: | |
A = lambda x: args.__dict__[x] if x in args.__dict__ else None | |
metadata = u.get_TAB_delimited_file_as_dictionary(A('metadata')) | |
self.group_list = group_list(metadata.keys()) | |
self.data = {} | |
u.get_TAB_delimited_file_as_dictionary(A('metadata')) | |
self.metadata = metadata | |
self.percent_alignment_threshold = A('percent_alignment_threshold') | |
self.average_nucleotide_identity = A('average_nucleotide_identity') | |
self.correlation_threshold = A('correlation_threshold') | |
if A('data') is not None: | |
# hopefuly a compatible table was provided by the user | |
self.data = u.get_TAB_delimited_file_as_dictionary(A('data')) | |
else: | |
# creating the data table from the following tables: metadata, matching, and correlation results | |
self.create_data_from_tables(args) | |
def get_list_of_group_names(self): | |
# creating a new list for the final group names | |
# this is needed because when merging groups we might end up with gaps in numbering the groups | |
N_items = len(self.metadata) | |
N_zeros = len(str(N_items)) | |
group_names=[] | |
for i in range(1,N_items): | |
group_names.append('group_' + str(i).zfill(N_zeros)) | |
return group_names | |
def store_redundancy_output(self, output_file): | |
redundancy_genome_dict = {} | |
headers = ['item', 'redundancy_group_name', 'representative_genome', 'replicates'] | |
# the groups will be renamed so that their numbering makes sense | |
group_dict = dict(zip(self.group_list.get_groups(), self.get_list_of_group_names())) | |
for Mygroup in group_dict.keys(): | |
for Mygenome in self.group_list.get_genomes_from_group(Mygroup): | |
redundancy_genome_dict[Mygenome] = {} | |
redundancy_genome_dict[Mygenome]['redundancy_group_name'] = group_dict[Mygroup] | |
redundancy_genome_dict[Mygenome]['representative_genome'] = int(Mygenome == self.group_list.get_group_rep(Mygroup)) | |
redundancy_genome_dict[Mygenome]['replicates'] = len(self.group_list.get_genomes_from_group(Mygroup)) | |
if redundancy_genome_dict: | |
u.store_dict_as_TAB_delimited_file(redundancy_genome_dict, output_file, headers) | |
else: | |
print('No redundant genomes were found') | |
def create_data_from_tables(self, args=None): | |
if args is None: | |
args_dict = test_args() | |
A = lambda x: args_dict[x] | |
else: | |
A = lambda x: args.__dict__[x] if x in args.__dict__ else None | |
metadata = self.metadata | |
self.data = {} | |
if A('matching') is not None: | |
matching = my_dictionary(A('matching'), 'matching') | |
if A('correlation') is not None: | |
correlation = my_dictionary(A('correlation'), 'correlation') | |
unique_id = 0 | |
for genome_1 in matching.d.keys(): | |
for genome_2 in matching.d[genome_1].keys(): | |
p = pair(unique_id) | |
match_dict = matching.d[genome_1][genome_2] | |
query_genome = genome_1 | |
target_genome = genome_2 | |
if matching.d.get(genome_2,dict()).get(genome_1, None): | |
# check if there is another alignment for these genomes | |
if float(matching.d[genome_1][genome_2]['average_nucleotide_identity']) < float(matching.d[genome_2][genome_1]['average_nucleotide_identity']): | |
# Keep the best alignment only | |
match_dict = matching.d[genome_2][genome_1] | |
query_genome = genome_2 | |
target_genome = genome_1 | |
del matching.d[genome_2][genome_1] # deleting the other alignment since it is redundant | |
# adding the alignment results information to the pair | |
p.update_pair_from_dictionary(match_dict) | |
# adding the correlation results information to the pair | |
p.update_pair_from_dictionary(correlation.d[genome_1][genome_2]) | |
# updating metadata to the pair | |
p.update_metadata(query_genome, target_genome, metadata) | |
# adding the pair to data | |
self.data[p.unique_id] = p.dictionary | |
unique_id = unique_id + 1 | |
def store_data_as_TAB_delimited_file(self, output_file): | |
u.store_dict_as_TAB_delimited_file(self.data, output_file) | |
def resolve_redundancy(self): | |
# dictionary to keep the group hash for each genome | |
for unique_id in self.data.keys(): | |
# checking to see if both genomes of the pair have already been assigned a hash | |
if float(self.data[unique_id]['correlation']) >= self.correlation_threshold and float(self.data[unique_id]['percent_alignment']) >= self.percent_alignment_threshold: | |
if float(self.data[unique_id]['average_nucleotide_identity']) >= self.average_nucleotide_identity: | |
# The pair is redundant | |
self.group_list.connect_genomes(self.data[unique_id]['item_1'], self.data[unique_id]['item_2'], self.data[unique_id]['domain']) | |
# pick all the representatives: | |
for Mygroup in self.group_list.get_groups(): | |
self.pick_rep(Mygroup) | |
def pick_rep(self, Mygroup): | |
Mydomain = self.group_list.get_group_domain(Mygroup) | |
for Mygenome in self.group_list.get_genomes_from_group(Mygroup): | |
if self.is_genome_better_than_rep(Mygroup, Mygenome, Mydomain): | |
self.set_genome_as_rep(Mygroup, Mygenome) | |
def is_genome_better_than_rep(self, Mygroup, Mygenome, Mydomain): | |
current_group_rep = self.group_list.get_group_rep(Mygroup) | |
if current_group_rep is None: | |
return True | |
else: | |
Mygenome_is_longer_than_rep = int(self.metadata[Mygenome]['length']) > int(self.metadata[current_group_rep]['length']) | |
if Mydomain == 'Eukaryote' or Mydomain == 'Eukaryota': | |
if Mygenome_is_longer_than_rep: | |
return True | |
else: | |
return False | |
elif Mydomain == 'Bacteria' or Mydomain == 'Archaea': | |
current_substantive_q = float(self.metadata[current_group_rep]['completion']) - float(self.metadata[current_group_rep]['redundancy']) | |
Mygenome_substantive_q = float(self.metadata[Mygenome]['completion']) - float(self.metadata[Mygenome]['redundancy']) | |
if Mygenome_substantive_q > current_substantive_q: | |
return True | |
elif Mygenome_substantive_q < current_substantive_q: | |
return False | |
else: | |
# The genomes are equal in substantive_q so picking the longest | |
if Mygenome_is_longer_than_rep: | |
return True | |
else: | |
return False | |
def set_genome_as_rep(self, Mygroup, Mygenome): | |
self.group_list.set_group_rep(Mygroup, Mygenome) | |
class group_list: | |
def __init__(self, group_list=None): | |
# class for lists for redundancy groups | |
if group_list == None: | |
group_list = [] | |
self.group_list = {} # a dictionary in which they keys are the hash of a group and the value is a group object | |
self.redundant_genomes = dict.fromkeys(group_list) # a dictionary in which the keys are names of genomes and the value is the hash of the group to which the genome belongs | |
self.number_of_redundancy_groups = 0 | |
self.number_of_items = len(group_list) | |
def is_redundant(self, genome): | |
if self.redundant_genomes[genome] is None: | |
return False | |
else: | |
return True | |
def get_group_domain(self, hash1): | |
return self.group_list[hash1].get_domain() | |
def create(self, output_path, headers=None, file_obj=None): | |
# Save the dictionary as a tab-delimited file | |
u.store_dict_as_TAB_delimited_file(self.data, output_path, headers, file_obj) | |
def merge_groups(self, hash1, hash2): | |
# adding the members of group 2 to group 1 | |
self.group_list[hash1].extend_genome_list(self.group_list[hash2].get_genome_list()) | |
for genome in self.group_list[hash2].get_genome_list(): | |
self.redundant_genomes[genome] = hash1 | |
del self.group_list[hash2] | |
def get_genomes_from_group(self, hash1): | |
return self.group_list[hash1].get_genome_list() | |
def get_groups(self): | |
# return a list of all the group hashes | |
return self.group_list.keys() | |
def get_group_rep(self, hash1): | |
return self.group_list[hash1].get_rep() | |
def set_group_rep(self, hash1, genome1): | |
return self.group_list[hash1].set_rep(genome1) | |
def connect_genomes(self, genome1, genome2, domain): | |
# if they are not, then add the genomes to the list of redundant genomes | |
if not self.is_redundant(genome1): | |
if not self.is_redundant(genome2): | |
# create new group | |
self.number_of_redundancy_groups += 1 | |
zero_fill = len(str(self.number_of_items)) | |
if zero_fill > 2: | |
# always adding at least 2 zeros | |
zero_fill = 2 | |
redundancy_group_number = 'group_' + str(self.number_of_redundancy_groups).zfill(zero_fill) | |
self.group_list[redundancy_group_number] = group(redundancy_group_name=redundancy_group_number, genome_list=[genome1, genome2], domain=domain) | |
self.redundant_genomes[genome1] = redundancy_group_number | |
self.redundant_genomes[genome2] = redundancy_group_number | |
else: | |
# adding genome1 to the group of genome2 | |
redundancy_group_number = self.redundant_genomes[genome2] | |
self.redundant_genomes[genome1] = redundancy_group_number | |
self.group_list[redundancy_group_number].add_genome(genome1) | |
else: | |
if not self.is_redundant(genome2): | |
# adding genome2 to the group of genome1 | |
redundancy_group_number = self.redundant_genomes[genome1] | |
self.redundant_genomes[genome2] = redundancy_group_number | |
self.group_list[redundancy_group_number].add_genome(genome2) | |
elif self.redundant_genomes[genome1] != self.redundant_genomes[genome2] and self.redundant_genomes[genome1] is not None: | |
# need to merge the groups | |
self.merge_groups(self.redundant_genomes[genome1], self.redundant_genomes[genome2]) | |
class group: | |
def __init__(self, genome_list, redundancy_group_name, representative=None, domain=None): | |
self.genome_list = genome_list | |
self.representative = representative | |
self.domain = domain | |
def set_rep(self, representative): | |
self.representative = representative | |
def extend_genome_list(self, new_list): | |
self.genome_list.extend(new_list) | |
def get_rep(self): | |
return self.representative | |
def get_genome_list(self): | |
return self.genome_list | |
def add_genome(self, genome): | |
self.genome_list.append(genome) | |
def get_domain(self): | |
return self.domain | |
class my_dictionary: | |
def __init__(self, my_file, dict_type): | |
self.d = {} | |
with open(my_file, 'r') as f: | |
reader = csv.DictReader(f, delimiter='\t') | |
for row in reader: | |
row = fix_keys(row) | |
if row['item_1'] not in self.d.keys(): | |
self.d[row['item_1']] = {} | |
if dict_type == 'matching': | |
self.add_matching_results_to_dictionary(row) | |
elif dict_type == 'correlation': | |
self.add_correlation_results_to_dictionary(row) | |
def add_matching_results_to_dictionary(self, row): | |
d = {} | |
d['matching_nucleotides'] = int(row['matching_nucleotides']) | |
d['average_nucleotide_identity'] = float(row['average_nucleotide_identity']) | |
d['percent_alignment'] = 100 * float(row['matching_nucleotides']) / min(int(row['item_1_length']), int(row['item_2_length'])) | |
self.d[row['item_1']][row['item_2']] = d | |
def add_correlation_results_to_dictionary(self, row): | |
d = {} | |
d['correlation'] = float(row['correlation']) | |
self.d[row['item_1']][row['item_2']] = d |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment