Skip to content

Instantly share code, notes, and snippets.

@peterk87
Created June 1, 2021 22:14
Show Gist options
  • Save peterk87/de921ea877dbad280ea7b395312f71e8 to your computer and use it in GitHub Desktop.
Save peterk87/de921ea877dbad280ea7b395312f71e8 to your computer and use it in GitHub Desktop.
Rotate a circular contig to start at the same position as a reference sequence
#!/usr/bin/env python3
# coding: utf-8
import logging
import subprocess
from io import StringIO
from pathlib import Path
from typing import Optional
import sys
import pandas as pd
import typer
from Bio import SeqIO, Entrez
from Bio.SeqRecord import SeqRecord
from rich.logging import RichHandler
from rich.traceback import install
BLAST_COLS = [
('qaccver', 'str'),
('saccver', 'str'),
('stitle', 'str'),
('pident', 'float'),
('length', 'int'),
('mismatch', 'int'),
('gapopen', 'int'),
('qstart', 'int'),
('qend', 'int'),
('sstart', 'int'),
('send', 'int'),
('qlen', 'int'),
('slen', 'int'),
]
def main(assembly_fasta: Path,
outdir: Path = typer.Option(Path('rotate_contig-output/'), help='Output directory'),
reference_accession: str = typer.Option('AY548484', '-r', '--reference-accession',
help='NCBI accession of nucleotide sequence to use for BLAST search'),
end_lengths: int = typer.Option(1000, '-l', '--end-lengths',
help='Length of seq from ends of reference sequence to use for BLAST search'),
min_pident: float = typer.Option(95.0,
help='Min nucleotide BLAST % identity for reference ends subsequences search'),
entrez_email: str = typer.Option('example@email.com', help='NCBI Entrez email'),
force: bool = typer.Option(False, help='Force overwrite output')):
"""Rotate a circular contig to start at the same position as a reference sequence
Usage:
Specify your assembly fasta file and reference accession (e.g. "AY548484"):
$ python rotate_contig.py assembly.fasta -r AY548484
Rotated target contig will be output to "rotate_contig-output/rotated.fasta".
Requirements:
This script requires Python 3.6+ and the biopython, pandas, rich and typer Python modules to be installed:
$ pip install biopython pandas rich typer
"""
install(show_locals=True, width=120, word_wrap=True)
logging.basicConfig(format='%(message)s',
datefmt='[%Y-%m-%d %X]',
level=logging.DEBUG,
handlers=[RichHandler(rich_tracebacks=True,
tracebacks_show_locals=True)])
if outdir.exists() and not force:
logging.error(f'The output directory "{outdir}" already exists! Overwrite previous results with "--force"')
sys.exit(1)
if not outdir.exists():
outdir.mkdir(parents=True)
Entrez.email = entrez_email
logging.info(f'Fetching {reference_accession} from NCBI using Entrez API')
with Entrez.efetch(db='nucleotide',
rettype='fasta',
retmode='text',
id=reference_accession) as handle:
seq_rec: SeqRecord = SeqIO.read(handle, 'fasta')
logging.info(f'Fetched "{seq_rec.id} {seq_rec.description}" from NCBI (length={len(seq_rec)})')
ref_fasta = outdir / f'{seq_rec.id}.fasta'
SeqIO.write([seq_rec], ref_fasta, 'fasta')
logging.info(f'Wrote reference fasta to "{ref_fasta}"')
ends_fasta = write_ref_ends_fasta(end_lengths, outdir, seq_rec)
makeblastdb(assembly_fasta, outdir)
blastn_results = blastn(assembly_fasta, ends_fasta)
df_blast_ends_top = parse_blastn(blastn_results, min_pident)
contig_name = df_blast_ends_top.saccver.unique()[0]
asm_recs = {r.id: r for r in SeqIO.parse(assembly_fasta, 'fasta')}
contig_seq_record = asm_recs[contig_name]
left_end, right_end = get_rotated_ends(contig_seq_record, df_blast_ends_top)
if left_end and right_end:
output_fasta = write_rotated_fasta(contig_seq_record, outdir, left_end, right_end)
logging.info(f'Wrote rotated contig to "{output_fasta}"')
else:
logging.error(f'Could not find matches to both ends. Start seq = "{left_end}". End seq = "{right_end}"')
sys.exit(1)
def write_rotated_fasta(contig_seq_record, outdir, s1, s2):
rot_rec = SeqRecord((s1.seq + s2.seq), id=contig_seq_record.id, name=contig_seq_record.name)
output_fasta = outdir / 'rotated.fasta'
SeqIO.write([rot_rec], output_fasta, 'fasta')
return output_fasta
def write_ref_ends_fasta(end_lengths: int, outdir: Path, seq_rec: SeqRecord) -> Path:
start_seq = seq_rec.seq[0:end_lengths]
end_seq = seq_rec.seq[-end_lengths:]
start_rec_name = f'{seq_rec.id}|1:{len(start_seq)}'
start_rec = SeqRecord(start_seq, id=start_rec_name, name=start_rec_name)
rec_len = len(seq_rec.seq)
end_rec_name = f'{seq_rec.name}|{rec_len - len(end_seq)}:{rec_len}'
end_rec = SeqRecord(end_seq, id=end_rec_name, name=end_rec_name)
ends_fasta = outdir / f'{seq_rec.id}-{end_lengths}-ends.fasta'
SeqIO.write([start_rec, end_rec], ends_fasta, 'fasta')
return ends_fasta
def get_rotated_ends(
contig_seq_record: SeqRecord,
df_blast_ends_top: pd.DataFrame
) -> (Optional[SeqRecord], Optional[SeqRecord]):
left_end: Optional[SeqRecord] = None
right_end: Optional[SeqRecord] = None
for row in df_blast_ends_top.itertuples():
query_name, seq_range = row.qaccver.split('|')
seq_start_idx, seq_end_idx = [int(x) - 1 for x in seq_range.split(':')]
do_revcomp = False
if row.sstart > row.send:
logging.info(f'Reverse complement of subseq matching "{query_name}" ({seq_range}) needed')
do_revcomp = True
if seq_start_idx == 0:
left_end = contig_seq_record[0:max(row.sstart, row.send)]
if do_revcomp:
left_end = left_end.reverse_complement()
else:
right_end = contig_seq_record[min(row.sstart, row.send):]
if do_revcomp:
right_end: SeqRecord = right_end.reverse_complement()
return left_end, right_end
def parse_blastn(blastn_results: subprocess.CompletedProcess, min_pident: float = 95.0) -> pd.DataFrame:
df_blast_ends = pd.read_table(StringIO(blastn_results.stdout.decode()),
names=[x for x, _ in BLAST_COLS],
dtype={x: y for x, y in BLAST_COLS})
return df_blast_ends[df_blast_ends.pident >= min_pident]
def blastn(assembly_fasta: Path, ends_fasta: Path) -> subprocess.CompletedProcess:
blastn_cmd = f'blastn ' \
f'-query {ends_fasta.absolute()} ' \
f'-db {assembly_fasta.absolute()} ' \
f'-outfmt "6 {" ".join(x for x, _ in BLAST_COLS)}"'
logging.info(f'Running nucleotide BLAST: $ {blastn_cmd}')
return subprocess.run(
blastn_cmd,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
def makeblastdb(assembly_fasta: Path, outdir: Path) -> None:
db_path = outdir / assembly_fasta.name
makeblastdb_cmd = f'makeblastdb ' \
f'-dbtype nucl ' \
f'-in {assembly_fasta.absolute()} ' \
f'-out {db_path.absolute()}'
logging.info(f'Making BLAST DB: $ {makeblastdb_cmd}')
subprocess.run(
makeblastdb_cmd,
check=True,
shell=True,
)
if __name__ == '__main__':
typer.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment