Created
August 8, 2018 10:33
-
-
Save JoshGutman/b20a5f9d5fb18265ce3ba22737b66ea4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Finds and searches for patterns of nucleotides given loci. | |
Given a file containing loci, a file containing nucleotide calls of those loci, | |
a database to search in, and optionally an ignore threshold, this program | |
will calculate all possible patterns of nucleotides and search for their | |
occurrences in the database. | |
""" | |
import itertools | |
import argparse | |
import re | |
from Bio import SeqIO | |
def main(loci_file, file, database, ignore): | |
"""Driver function. | |
Args: | |
loci_file (str): Path to a file containing one locus on each line. | |
file (str): Path to file containing nucleotide calls. | |
database (str): Path to file containing database. | |
ignore (float): Minimum percentage threshold of a nucleotide for it to | |
be considered in pattern calculation. | |
Returns: | |
None. | |
""" | |
loci = get_loci(loci_file) | |
loci_bases = get_bases(loci, file) | |
patterns = get_patterns(loci, loci_bases, ignore) | |
hits = search_database(patterns, database) | |
output_hits(hits) | |
def get_loci(loci_file): | |
"""Get loci from an input file. | |
Args: | |
loci_file (str): Path to a file containing one locus on each line. | |
Returns: | |
list of str: List of all loci. | |
""" | |
with open(loci_file, "r") as infile: | |
lines = infile.readlines() | |
return list(map(lambda s: s.strip(), lines)) | |
def get_bases(loci, file): | |
"""Gets all bases that occur in each locus's nucleotide call. | |
Args: | |
loci (list of str): List of all loci. | |
file (str): Path to file containing nucleotide calls. | |
Returns: | |
dict: | |
key (str): Locus name. | |
value (dict): | |
key (str): Base. | |
value (int): Number of occurrences of that base. | |
Notes: | |
Iterates over `file` one time. Rather than searching for multiple | |
substrings in `file`, which could be very inefficient, each locus name | |
in `file` is checked for existence in a dictionary containing the | |
desired loci, with each check being O(1). This results in a runtime of | |
O(N), where N is the number of lines in `file`. | |
""" | |
loci_bases = {key: {"A": 0, "C": 0, "G": 0, "T": 0} for key in loci} | |
with open(file, "r") as infile: | |
for line in infile: | |
fields = line.split("\t") | |
locus = "\t".join(fields[:2]) | |
if locus in loci_bases: | |
for base in fields[4].upper(): | |
if base in loci_bases[locus]: | |
loci_bases[locus][base] += 1 | |
return loci_bases | |
def get_patterns(loci, loci_bases, ignore_threshold): | |
"""Calculates all possible patterns of bases. | |
Args: | |
loci (list of str): List of all loci. | |
loci_bases (dict): Output of get_bases. | |
ignore_threshold (float): Minimum percentage threshold of a nucleotide | |
for it to be considered in pattern calculation. | |
Returns: | |
list of str: List of all patterns. | |
""" | |
loci_base_list = {key: [] for key in loci} | |
for locus in loci: | |
total = sum([loci_bases[locus][base] for base in loci_bases[locus]]) | |
for base in loci_bases[locus]: | |
if total and loci_bases[locus][base] / total > ignore_threshold: | |
loci_base_list[locus].append(base) | |
# If locus did not occur in nucleotide call file, give it an "X" | |
if not loci_base_list[locus]: | |
loci_base_list[locus].append("X") | |
product_input_list = [] | |
for locus in loci: | |
product_input_list.append(loci_base_list[locus]) | |
for pattern in itertools.product(*product_input_list): | |
yield "".join(pattern) | |
def search_database(patterns, database): | |
"""Searches database for patterns using wildcards in re. | |
Args: | |
patterns (list of str): List of all patterns. | |
database (str): Path to database FASTA file. | |
Returns: | |
dict: | |
key (str): Pattern. | |
value (list of str): FASTA headers where pattern occurs. | |
Notes: | |
Efficiency is O(N * M), where N is number of records in database and | |
M is number of patterns. | |
""" | |
hits = {key: [] for key in patterns} | |
for record in SeqIO.parse(database, "fasta"): | |
sequence = str(record.seq) | |
for pattern in patterns: | |
if re.search(pattern.replace("X", "."), sequence): | |
hits[pattern].append(str(record.id)) | |
return hits | |
def output_hits(hits): | |
"""Outputs the hits found in the database. | |
Args: | |
hits (dict): Output from search_database. | |
Returns: | |
None. | |
Prints to stdout. | |
""" | |
# Get rid of patterns with no hits and order output by number of hits | |
order = filter(lambda x: len(hits[x]), | |
sorted(list(hits.keys()), key=lambda x: len(hits[x]))) | |
out = "" | |
for hit in order: | |
out += "{}\t{}\t{}\n".format(hit, ",".join(hits[hit]), hit.count("X")) | |
if out: | |
print(out) | |
else: | |
print("No matches found.") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Pattern finding") | |
parser.add_argument("-l", "--loci", required=True, | |
help="Path to file containing loci") | |
parser.add_argument("-f", "--file", required=True, | |
help="Path to file containing nucleotide calls") | |
parser.add_argument("-d", "--database", required=True, | |
help="Path to database to search for patterns") | |
parser.add_argument("-i", "--ignore", default=0, type=float, | |
help="Minimum percentage threshold (between 0 and 1) " + | |
"of a nucleotide for it to be considered in pattern " + | |
"calculation.") | |
args = parser.parse_args() | |
main(args.loci, args.file, args.database, args.ignore) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment