Skip to content

Instantly share code, notes, and snippets.

@sminot
Created September 8, 2017 20:52
Show Gist options
  • Save sminot/d61a88ef1a031f89ff0b1de400f7bcb5 to your computer and use it in GitHub Desktop.
Save sminot/d61a88ef1a031f89ff0b1de400f7bcb5 to your computer and use it in GitHub Desktop.
Compare protein FASTAs with BLASTP and output XLSX
#!/usr/bin/python
"""Given a set of protein FASTA files, perform pairwise comparison via BLAST, outputting an Excel spreadsheet."""
import os
import sys
import json
import subprocess
import pandas as pd
from collections import defaultdict
from Bio.SeqIO.FastaIO import SimpleFastaParser
helptext = """compare_proteins.py
This script will compare a set of amino acid sequences, providing
a summary of the percent identity shared across all samples.
Inputs:
1. Folder containing a set of amino acid sequences in FASTA format
2. Path to write Excel output
"""
if len(sys.argv) == 1:
assert False, helptext
assert len(sys.argv) == 3, "Usage: python compare_proteins.py <folder> <output_fp>"
folder = sys.argv[1]
output_fp = sys.argv[2]
assert os.path.exists(folder), "Input folder does not exist"
assert output_fp.endswith('.xlsx'), "Output path must end with .xlsx"
assert os.path.exists(output_fp) is False, "Output path already exists"
def blastp_compare_proteins(file1, file2):
"""Compare two protein FASTA files using BLASTP."""
# 1. Get the length and annotation of every protein
protein_info = {}
for header, sequence in SimpleFastaParser(open(file1, 'rt')):
name, desc = header.split(' ', 1)
protein_info[name] = {'length': float(len(sequence)), 'desc': desc}
# 2. Get the BLASTP output
p = subprocess.Popen(['blastp', '-query', file1,
'-subject', file2,
'-num_alignments', '1', # Only show the top alignment
'-outfmt', '7'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Get the output
stdout, stderr = p.communicate()
# Fail if the exit code != 0
assert p.wait() == 0, stderr
# 3. Parse the BLASTP output
# All of the results are keyed by protein name
proteins = {}
# Read through the BLAST results
for line in stdout.split('\n'):
# Skip header lines
if len(line) <= 1 or line[0] == '#':
continue
# Break up the line into tab-delimited fields
fields = line.strip('\n').split('\t')
# Skip empty lines
if len(fields) < 12:
continue
protein_name = fields[0]
# Store the percent identity and percent coverage
pct_id = float(fields[2])
pct_cov = float(fields[3]) / protein_info[protein_name]['length']
# Add the annotation
protein_name = "{} {}".format(protein_name, protein_info[protein_name]['desc'])
# Skip the line if we've already recorded the best hit for this protein
if protein_name in proteins:
continue
# Report the percent identity * percent coverage
proteins[protein_name] = pct_id * pct_cov
return proteins
# Store all of the pairwise identity information in a dict
# {
# 'sample1': {
# 'sample2': {
# 'gene1': (<coverage & identity>)
# },
# {
# 'gene2': (<coverage & identity>)
# }
# }
# }
dat = defaultdict(dict)
for file1 in os.listdir(folder):
for file2 in os.listdir(folder):
if file1 == file2:
continue
dat[file1][file2] = blastp_compare_proteins("{}/{}".format(folder, file1),
"{}/{}".format(folder, file2))
# Reformat this as a spreadsheet
# Tabs are named by the query file
# Columns are named by the subject file
# Rows are named by the annotated protein
with pd.ExcelWriter(output_fp) as writer:
for file1, df in dat.items():
df = pd.DataFrame(df).fillna(0)
df.to_excel(writer, file1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment