Skip to content

Instantly share code, notes, and snippets.

@JoshGutman
Created August 8, 2018 10:33
Show Gist options
  • Save JoshGutman/b20a5f9d5fb18265ce3ba22737b66ea4 to your computer and use it in GitHub Desktop.
Save JoshGutman/b20a5f9d5fb18265ce3ba22737b66ea4 to your computer and use it in GitHub Desktop.
"""Finds and searches for patterns of nucleotides given loci.
Given a file containing loci, a file containing nucleotide calls of those loci,
a database to search in, and optionally an ignore threshold, this program
will calculate all possible patterns of nucleotides and search for their
occurrences in the database.
"""
import itertools
import argparse
import re
from Bio import SeqIO
def main(loci_file, file, database, ignore):
"""Driver function.
Args:
loci_file (str): Path to a file containing one locus on each line.
file (str): Path to file containing nucleotide calls.
database (str): Path to file containing database.
ignore (float): Minimum percentage threshold of a nucleotide for it to
be considered in pattern calculation.
Returns:
None.
"""
loci = get_loci(loci_file)
loci_bases = get_bases(loci, file)
patterns = get_patterns(loci, loci_bases, ignore)
hits = search_database(patterns, database)
output_hits(hits)
def get_loci(loci_file):
"""Get loci from an input file.
Args:
loci_file (str): Path to a file containing one locus on each line.
Returns:
list of str: List of all loci.
"""
with open(loci_file, "r") as infile:
lines = infile.readlines()
return list(map(lambda s: s.strip(), lines))
def get_bases(loci, file):
"""Gets all bases that occur in each locus's nucleotide call.
Args:
loci (list of str): List of all loci.
file (str): Path to file containing nucleotide calls.
Returns:
dict:
key (str): Locus name.
value (dict):
key (str): Base.
value (int): Number of occurrences of that base.
Notes:
Iterates over `file` one time. Rather than searching for multiple
substrings in `file`, which could be very inefficient, each locus name
in `file` is checked for existence in a dictionary containing the
desired loci, with each check being O(1). This results in a runtime of
O(N), where N is the number of lines in `file`.
"""
loci_bases = {key: {"A": 0, "C": 0, "G": 0, "T": 0} for key in loci}
with open(file, "r") as infile:
for line in infile:
fields = line.split("\t")
locus = "\t".join(fields[:2])
if locus in loci_bases:
for base in fields[4].upper():
if base in loci_bases[locus]:
loci_bases[locus][base] += 1
return loci_bases
def get_patterns(loci, loci_bases, ignore_threshold):
"""Calculates all possible patterns of bases.
Args:
loci (list of str): List of all loci.
loci_bases (dict): Output of get_bases.
ignore_threshold (float): Minimum percentage threshold of a nucleotide
for it to be considered in pattern calculation.
Returns:
list of str: List of all patterns.
"""
loci_base_list = {key: [] for key in loci}
for locus in loci:
total = sum([loci_bases[locus][base] for base in loci_bases[locus]])
for base in loci_bases[locus]:
if total and loci_bases[locus][base] / total > ignore_threshold:
loci_base_list[locus].append(base)
# If locus did not occur in nucleotide call file, give it an "X"
if not loci_base_list[locus]:
loci_base_list[locus].append("X")
product_input_list = []
for locus in loci:
product_input_list.append(loci_base_list[locus])
for pattern in itertools.product(*product_input_list):
yield "".join(pattern)
def search_database(patterns, database):
"""Searches database for patterns using wildcards in re.
Args:
patterns (list of str): List of all patterns.
database (str): Path to database FASTA file.
Returns:
dict:
key (str): Pattern.
value (list of str): FASTA headers where pattern occurs.
Notes:
Efficiency is O(N * M), where N is number of records in database and
M is number of patterns.
"""
hits = {key: [] for key in patterns}
for record in SeqIO.parse(database, "fasta"):
sequence = str(record.seq)
for pattern in patterns:
if re.search(pattern.replace("X", "."), sequence):
hits[pattern].append(str(record.id))
return hits
def output_hits(hits):
"""Outputs the hits found in the database.
Args:
hits (dict): Output from search_database.
Returns:
None.
Prints to stdout.
"""
# Get rid of patterns with no hits and order output by number of hits
order = filter(lambda x: len(hits[x]),
sorted(list(hits.keys()), key=lambda x: len(hits[x])))
out = ""
for hit in order:
out += "{}\t{}\t{}\n".format(hit, ",".join(hits[hit]), hit.count("X"))
if out:
print(out)
else:
print("No matches found.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pattern finding")
parser.add_argument("-l", "--loci", required=True,
help="Path to file containing loci")
parser.add_argument("-f", "--file", required=True,
help="Path to file containing nucleotide calls")
parser.add_argument("-d", "--database", required=True,
help="Path to database to search for patterns")
parser.add_argument("-i", "--ignore", default=0, type=float,
help="Minimum percentage threshold (between 0 and 1) " +
"of a nucleotide for it to be considered in pattern " +
"calculation.")
args = parser.parse_args()
main(args.loci, args.file, args.database, args.ignore)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment