Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Fasta + Bam file validation with S3 support
import os
import sys
import gzip
import io
import tempfile
from Bio import SeqIO, bgzf
import pysam
import boto3
# Logger
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Boto | S3
s3 = boto3.resource('s3')
s3client = boto3.client('s3')
#stream = io.BytesIO(s3.get_object(Bucket=bucket, Key=key)['Body'].read()) # Type check some of this please
# with open("example.bam", 'rb') as ifile:
# with bgzf.BgzfReader(mode='r', fileobj=ifile) as bamfh:
# print(bamfh.readline().rstrip())
alignment_suffixes = [".sam", ".bam", ".cram"]
s3prefix = "s3://"
class Reader:
def __init__(self, alignment: io.IOBase, reference: io.IOBase):
if not isinstance(alignment, io.IOBase):
raise TypeError("ncbi.reader.Reader expects a 'alignment' file as its first positional argument")
elif not isinstance(reference, io.IOBase):
raise TypeError("ncbi.reader.Reader expects a 'reference' Fasta file as its second positional argument")
alignment.close()
reference.close()
fasta_data = self.__validate_fasta(reference.name)
self.reference_file = fasta_data["filepath"]
self.__reference_is_temporary_file = fasta_data["temporary"]
self.reference_lengths = fasta_data["lengths"]
alignment_data = self.__validate_alignment(alignment.name, self.reference_lengths)
self.alignment_file = alignment_data["indexed"]
self.__alignment_is_temporary_file = alignment_data["temporary"]
self.alignment = pysam.AlignmentFile(self.alignment_file)
def __exit__(self, exc_type, exc_value, traceback):
self.alignment.close()
if self.__alignment_is_temporary_file:
for f in glob.glob(self.alignment_file + "*"):
logger.debug(" Unlinking alignment/index file '{0}'...".format(f))
os.unlink(f)
if self.__reference_is_temporary_file:
logger.debug(" Unlinking reference file '{0}'...".format(self.reference_file))
os.unlink(self.reference_file)
def __s3_file_download(self, seqpath):
"""
Note: the file will be downloaded into a temporary file that needs to be deleted afterwards
It will create the temporary file with respect to the TMP bash variable 'export TMP=/some/temporary/location'
:param seqpath: The s3 identifier of a object. 's3://bucket/example.fasta'
:type seqpath: str
:returns: The location of a downloaded gennomic Fasta file
:rtype: str
"""
if type(seqpath) is not str:
raise TypeError("ngsci.reader.__s3_file_download expects a str 'seqpath' as its first positional argument")
elif seqpath[0:5] != s3prefix:
raise TypeError("ngsci.reader.__s3_file_download expects a s3 object reference its first positional argument. e.g. 's3://bucket/example.txt'")
seqpath = seqpath.lstrip(s3prefix)
pathsegs =seqpath.split('/')
bucket = pathsegs.pop(0)
key = pathsegs.join('/')
if seqpath[-3:] == ".gz":
suffix = '.' + sepath.split('.')[-2:].join('.')
else:
suffix = os.path.splitext(seqpath)[1]
filepath = tempfile.NamedTemporaryFile(mode='w+b', suffix=suffix, delete=False)
logger.info("Downloading '{0}' => '{1}'...".format(seqpath, filepath.name))
obj = s3.Object(bucket, key)
obj.download_fileobj(filepath)
filepath.close()
return filepath.name
def __validate_fasta(self, seqpath):
"""
Note: the function returns a NamedTemporaryFile if the input is an s3 object. This needs to be os.unlink'd
:param seqpath:
:returns: {lengths: {seqid: seqlen, ...}, filepath: 'path/to/example.fasta', temporary: False} Returns a dictionary of sequence objects, the filepath, and whether the
:rtype:
"""
if type(seqpath) is not str:
raise TypeError("ngsci.reader.__validate_fasta expects a str 'seqpath' as its first positional argument")
elif (seqpath[-3:] != ".fa" and seqpath[-6:] != ".fa.gz" and seqpath[-6:] != ".fasta" and seqpath[-9:] != ".fasta.gz"):
raise TypeError("ngsci.reader.__validate_fasta expects the filepath to have a '.fa', '.fasta', or gzipped version of these format")
result = {
"lengths": [],
"filepath": seqpath,
"temporary": False
}
if seqpath[0:5] == "s3://":
localfasta = self.__s3_file_download(seqpath)
result["filepath"] = localfasta
result["temporary"] = True
elif not os.path.exists(result["filepath"]) or not os.access(result["filepath"], os.R_OK):
raise os.error("ngsci.reader.read_fasta expects the file to be accessible on the filesystem")
try:
logger.debug("Reading reference sequence lengths from fasta file '{0}'...".format(result["filepath"]))
if seqpath[-3:] == ".gz":
with open(result["filepath"], 'rb') as ifile:
with gzip.open(ifile, mode='rt') as data:
result["lengths"] = dict([(x.id, len(x.seq)) for x in list(SeqIO.parse(data, 'fasta'))])
else: # Uncompressed fasta
with open(result["filepath"], 'r') as ifile:
result["lengths"] = dict([(x.id, len(x.seq)) for x in list(SeqIO.parse(ifile, 'fasta'))])
except Exception as e:
logger.error(e)
sys.exit(1)
return result
def __validate_alignment(self, seqpath, lengths):
if type(seqpath) is not str:
raise TypeError("ngsci.reader.__validate_alignment expects a str 'seqpath' as its first positional argument")
elif type(lengths) is not dict or not len(lengths) > 0:
raise TypeError("ngsci.reader.__validate_alignment expects a list of tuples as its second positional argument")
suffix = os.path.splitext(seqpath)[1]
if suffix not in alignment_suffixes:
raise TypeError("ngsci.reader.__validate_alignment expects the sequence file to have a '.sam', '.bam', or '.cram' suffix")
else:
mode=dict(zip(alignment_suffixes, ['r', 'rb', 'rc']))[suffix]
result = {
"filepath": seqpath,
"sorted": None,
"indexed": None,
"temporary": False
}
if seqpath[0:5] == "s3://":
localsam = self.__s3_file_download(seqpath)
result["filepath"] = localsam
result["temporary"] = True
elif not os.path.exists(result["filepath"]) or not os.access(result["filepath"], os.R_OK):
raise os.error("ngsci.reader.__validate_alignment expects the file to be accessible on the filesystem")
try:
logger.debug("Verifying index status of '{0}'...".format(result["filepath"]))
samfile = pysam.AlignmentFile(result["filepath"], mode)
try:
if samfile.check_index():
result["sorted"] = result["filepath"]
result["indexed"] = result["filepath"]
samfile.close()
except (ValueError, AttributeError) as e:
#logger.debug(e)
logger.debug("Alignnment file '{0}' is not indexed... sorting and indexing now".format(result["filepath"]))
sorted_file = tempfile.NamedTemporaryFile(suffix=suffix, mode='w+b', delete=False)
sorted_file.close()
result["sorted"] = sorted_file.name # The filepath attribute refers to the temporary sorted + indexed file
pysam.sort('-o', result["sorted"], result["filepath"]) # Sort the original filepath into the sorted file
if result["temporary"] is True:
os.unlink(result["filepath"]) # Delete the old temporary file
result["filepath"] = result["sorted"]
result["temporary"] = True
pysam.index(result["sorted"])
result["indexed"] = result["sorted"]
samseqs = dict(list(zip(samfile.references, samfile.lengths)))
logger.debug("Cross-referencing reference sequences/lengths with alignment file sequences and lengths...")
for k, l in samseqs.items():
if k not in lengths.keys():
raise AttributeError("Alignment file '{0}' references fasta sequence '{1}' which was not provided in the fasta file".format(result["indexed"], k))
elif l != lengths[k]:
raise AttributeError("Alignment file '{0}' with length {1} references a fasta sequence '{2}' with length {3}".format(result["indexed"], l, k, lengths[k]))
samfile.close()
except AttributeError as e:
logger.error("Enncountered an error while sorting and indexing the alignment file '{0}'".format(result["filepath"]))
logger.debug(result)
logger.debug(se)
samfile.close()
if result["temporary"] is True:
if result["filepath"] != seqpath:
os.unlink(result["filepath"])
sorted_alignment = result["sorted"]
indexed_alignment = result["indexed"]
if sorted_alignment is not None and os.path.exists(sorted_alignment) and sorted_alignment != seqpath:
logger.debug("Removing temporary sorted alignment file '{0}'...".format(sorted_alignment))
os.unlink(sorted_alignment)
if indexed_alignment is not None and os.path.exists(indexed_alignment) and indexed_alignment != seqpath:
logger.debug("Removing temporary indexed alignment file '{0}'...".format(indexed_alignment))
os.unlink(indexed_alignment)
return result
def read_blocks(self, reference, start, end):
"""This will read a chunk of reads as a pileup column. Usage as follows
for pileupcolumn in reader_instance.read_blocks(reference, 1, 200):
for pileupread in pileupcolumn.pileups:
if not pileupread.is_del and not pileupread.is_refskip:
# Logic for strand specificity and paired end
# do something with pileupread.alignment
# pileupread.alignment.reference_start
# pileupread.alignment.reference_end
###################
# strand-specific
###################
# :::: Filter criteria ::::
# pileupread.alignment.is_proper_pair
# :::: Conditionals
# pileupread.alignment.is_read1 and not is_reverse # FR pair, F is read one on forward strand
# pileupread.alignment.is_read1 and is_reverse # FR pair, F is read one on reverse strand
# OTHER ALIGNMENT TYPES UNHANDLED
###################
# unstranded
###################
# :::: Filter criteria ::::
# OPTIONS?
# :::: Conditionals
# NONE
# pileup.alignment.reference_start
#
:param reference:
:param start:
:param end:
:returns:
:rtype:
"""
if type(reference) is not str:
raise TypeError("ngsci.reader.Reader expects a str 'reference' as its first positional argument")
elif type(start) is not int:
raise TypeError("ngsci.reader.Reader expects a int 'start' as its second positional argument")
elif type(end) is not int:
raise TypeError("ngsci.reader.Reader expects a int 'end' as its third positional argument")
elif reference not in self.reference_lengths.keys():
raise KeyError("ngsci.reader.Reader.read_blocks expects the reference argument to be present in the reference Fasta")
return self.alignment.pileup(reference, start, end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.