Skip to content

Instantly share code, notes, and snippets.

@jasonsahl
Last active December 12, 2015 07:08
Show Gist options
  • Save jasonsahl/33b0d9a8e3ac035bb92c to your computer and use it in GitHub Desktop.
Save jasonsahl/33b0d9a8e3ac035bb92c to your computer and use it in GitHub Desktop.
Retrieve MLST profiles from a set of genome assemblies. If the query length divided by the reference length is below a given threshold (default is 0.95), a "T", for truncation, is reported. If no BLAST alignment is present, an "X" is returned.
#!/usr/bin/env python
"""calculates MLST types from assemblies using BLAST.
If the gene is truncated, it will report a "T" and if
the gene has no blast hit, it will report a "X".
Your reference allele names must all end in "fasta"
and must contain a "_" between gene name and number.
The only external dependency is blastall - tested version
is 2.2.25. Tested version of Python is 2.7 and will
likely not work with Python 3"""
import optparse
import glob
import subprocess
import os
import re
import sys
def get_gene_order(reference):
firstLine = open(reference).readline()
fields = firstLine.split()
ordered = fields[1:]
return ordered
def get_seq_name(in_fasta):
"""used for renaming the sequences"""
return os.path.basename(in_fasta)
def run_blast(directory, fastas, ordered, reference, trunc, penalty, reward):
genome_names = [ ]
print "blast starting"
for infile in glob.glob(os.path.join(directory, '*.fasta')):
for genome in glob.glob(os.path.join(fastas, '*.fasta')):
name = get_seq_name(infile)
tmp_name = get_seq_name(genome)
genome_name = tmp_name.replace('.fasta','')
genome_names.append(genome_name)
outfile = open("%s_blast.out" % name, "w")
subprocess.check_call("formatdb -i %s -p F" % genome, shell=True)
cmd = ["blastall",
"-p", "blastn",
"-i", infile,
"-d", genome,
"-o", "%s_blast_%s.out" % (name, genome_name),
"-e", "0.0001",
"-F", "F",
"-q", str(penalty),
"-r", str(reward),
"-m", "8"]
subprocess.check_call(cmd)
new_infile = open("%s_blast_%s.out" % (name, genome_name))
new_outfile = open("%s_blast_%s_fixed.out" % (name, genome_name), "w")
for line in new_infile:
fields=line.split()
print >> new_outfile, "\t".join(fields)
new_infile.close()
new_outfile.close()
print "blast finished"
nr=[x for i, x in enumerate(genome_names) if x not in genome_names[i+1:]]
outfile = open("profile.txt", "a")
pad_order = list(ordered)
pad_order.insert(0,"genome")
pad_order.insert(8,"ST")
print >> outfile, "\t".join(pad_order)
for genome in nr:
alleles = [ ]
del fields[0:len(fields)]
for marker in ordered:
new_list = [line.strip().split("\t") for line in open("%s.fasta_blast_%s_fixed.out" % (marker, genome), "rU")]
new_list=sorted(new_list, key=lambda v: float(v[-1]), reverse=True)
try:
fields=new_list[0]
if int(fields[3])/int(fields[7])>=trunc and float(fields[2])>0.98:
alleles.append(fields[0].split("_")[1])
elif int(fields[3])/int(fields[7])>=trunc and float(fields[2])<=0.98:
alleles.append("N")
else:
alleles.append("T")
except:
alleles.append("X")
sys.exc_clear()
del fields[0:len(fields)]
del new_list[0:len(new_list)]
alleles.insert(0, genome)
in_ref = open(reference, "rU")
for line in in_ref:
ref_fields=line.split()
sum = [ ]
for i in range(1,8):
if ref_fields[i]==alleles[i]:
sum.append(i)
if int(len(sum))==7:
alleles.insert(8, ref_fields[0])
print >> outfile, "\t".join(alleles)
def test_file(option, opt_str, value, parser):
try:
with open(value): setattr(parser.values, option.dest, value)
except IOError:
print '%s cannot be opened' % option
sys.exit()
def test_dir(option, opt_str, value, parser):
if os.path.exists(value):
setattr(parser.values, option.dest, value)
else:
print "%s cannot be found" % option
sys.exit()
def main(directory, reference, fastas, trunc, penalty, reward):
print "Path to blastall"
aa = subprocess.call(['which', 'blastall'])
if aa == 0:
pass
else:
print "Blast must be in your path as blastall"
sys.exit()
ordered = get_gene_order(reference)
run_blast(directory, fastas, ordered, reference, trunc, penalty, reward)
subprocess.check_call("rm *.out", shell=True)
if __name__ == "__main__":
usage="usage: %prog [options]"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-d", "--seq_dir", dest="directory",
help="/path/to/reference_seq_directory [REQUIRED]",
action="callback", callback=test_dir, type="string")
parser.add_option("-r", "--ref file", dest="reference",
help="/path/to/ST_reference_file [REQUIRED]",
action="callback", callback=test_file, type="string")
parser.add_option("-f", "--fasta dir", dest="fastas",
help="/path/to/your_genomes [REQUIRED]",
action="callback", callback=test_dir, type="string")
parser.add_option("-t", "--truncated value", dest="trunc",
help="gene is considered truncated if below this value [default = 0.95]",
default="0.95", type="float")
parser.add_option("-q", "--penalty", dest="penalty",
help="penalty for a BLAST mismatch. Defaults to -4, see BLAST documentation \
for valid q/r pairings",
default="-4", type="int")
parser.add_option("-w", "--reward", dest="reward",
help="reward for a BLAST match. Defaults to 5, see BLAST documentation \
for valid q/r pairings",
default="5", type="int")
options, args = parser.parse_args()
mandatories = ["directory", "reference", "fastas"]
for m in mandatories:
if not getattr(options, m, None):
print "\nMust provide %s.\n" %m
parser.print_help()
exit(-1)
main(options.directory, options.reference, options.fastas, options.trunc, options.penalty, options.reward)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment