Created September 8, 2017 20:52
Compare protein FASTAs with BLASTP and output XLSX
"""Given a set of protein FASTA files, perform pairwise comparison via BLAST, outputting an Excel spreadsheet."""
import os
import sys
import json
import subprocess
import pandas as pd
from collections import defaultdict
from Bio.SeqIO.FastaIO import SimpleFastaParser
helptext = """
This script will compare a set of amino acid sequences, providing
a summary of the percent identity shared across all samples.
1. Folder containing a set of amino acid sequences in FASTA format
2. Path to write Excel output
if len(sys.argv) == 1:
assert False, helptext
assert len(sys.argv) == 3, "Usage: python <folder> <output_fp>"
folder = sys.argv[1]
output_fp = sys.argv[2]
assert os.path.exists(folder), "Input folder does not exist"
assert output_fp.endswith('.xlsx'), "Output path must end with .xlsx"
assert os.path.exists(output_fp) is False, "Output path already exists"
def blastp_compare_proteins(file1, file2):
"""Compare two protein FASTA files using BLASTP."""
# 1. Get the length and annotation of every protein
protein_info = {}
for header, sequence in SimpleFastaParser(open(file1, 'rt')):
name, desc = header.split(' ', 1)
protein_info[name] = {'length': float(len(sequence)), 'desc': desc}
# 2. Get the BLASTP output
p = subprocess.Popen(['blastp', '-query', file1,
'-subject', file2,
'-num_alignments', '1', # Only show the top alignment
'-outfmt', '7'],
# Get the output
stdout, stderr = p.communicate()
# Fail if the exit code != 0
assert p.wait() == 0, stderr
# 3. Parse the BLASTP output
# All of the results are keyed by protein name
proteins = {}
# Read through the BLAST results
for line in stdout.split('\n'):
# Skip header lines
if len(line) <= 1 or line[0] == '#':
# Break up the line into tab-delimited fields
fields = line.strip('\n').split('\t')
# Skip empty lines
if len(fields) < 12:
protein_name = fields[0]
# Store the percent identity and percent coverage
pct_id = float(fields[2])
pct_cov = float(fields[3]) / protein_info[protein_name]['length']
# Add the annotation
protein_name = "{} {}".format(protein_name, protein_info[protein_name]['desc'])
# Skip the line if we've already recorded the best hit for this protein
if protein_name in proteins:
# Report the percent identity * percent coverage
proteins[protein_name] = pct_id * pct_cov
return proteins
# Store all of the pairwise identity information in a dict
# {
# 'sample1': {
# 'sample2': {
# 'gene1': (<coverage & identity>)
# },
# {
# 'gene2': (<coverage & identity>)
# }
# }
# }
dat = defaultdict(dict)
for file1 in os.listdir(folder):
for file2 in os.listdir(folder):
if file1 == file2:
dat[file1][file2] = blastp_compare_proteins("{}/{}".format(folder, file1),
"{}/{}".format(folder, file2))
# Reformat this as a spreadsheet
# Tabs are named by the query file
# Columns are named by the subject file
# Rows are named by the annotated protein
with pd.ExcelWriter(output_fp) as writer:
for file1, df in dat.items():
df = pd.DataFrame(df).fillna(0)
df.to_excel(writer, file1)
