import os
import sys
import gzip
import io
import tempfile
from Bio import SeqIO, bgzf
import pysam
import boto3
# Logger
import logging
logger = logging.getLogger(__name__)
# Boto | S3
s3 = boto3.resource('s3')
s3client = boto3.client('s3')
#stream = io.BytesIO(s3.get_object(Bucket=bucket, Key=key)['Body'].read()) # Type check some of this please
# with open("example.bam", 'rb') as ifile:
# with bgzf.BgzfReader(mode='r', fileobj=ifile) as bamfh:
# print(bamfh.readline().rstrip())
alignment_suffixes = [".sam", ".bam", ".cram"]
s3prefix = "s3://"
class Reader:
def __init__(self, alignment: io.IOBase, reference: io.IOBase):
if not isinstance(alignment, io.IOBase):
raise TypeError("ncbi.reader.Reader expects a 'alignment' file as its first positional argument")
elif not isinstance(reference, io.IOBase):
raise TypeError("ncbi.reader.Reader expects a 'reference' Fasta file as its second positional argument")
fasta_data = self.__validate_fasta(
self.reference_file = fasta_data["filepath"]
self.__reference_is_temporary_file = fasta_data["temporary"]
self.reference_lengths = fasta_data["lengths"]
alignment_data = self.__validate_alignment(, self.reference_lengths)
self.alignment_file = alignment_data["indexed"]
self.__alignment_is_temporary_file = alignment_data["temporary"]
self.alignment = pysam.AlignmentFile(self.alignment_file)
def __exit__(self, exc_type, exc_value, traceback):
if self.__alignment_is_temporary_file:
for f in glob.glob(self.alignment_file + "*"):
logger.debug(" Unlinking alignment/index file '{0}'...".format(f))
if self.__reference_is_temporary_file:
logger.debug(" Unlinking reference file '{0}'...".format(self.reference_file))
def __s3_file_download(self, seqpath):
Note: the file will be downloaded into a temporary file that needs to be deleted afterwards
It will create the temporary file with respect to the TMP bash variable 'export TMP=/some/temporary/location'
:param seqpath: The s3 identifier of a object. 's3://bucket/example.fasta'
:type seqpath: str
:returns: The location of a downloaded gennomic Fasta file
:rtype: str
if type(seqpath) is not str:
raise TypeError("ngsci.reader.__s3_file_download expects a str 'seqpath' as its first positional argument")
elif seqpath[0:5] != s3prefix:
raise TypeError("ngsci.reader.__s3_file_download expects a s3 object reference its first positional argument. e.g. 's3://bucket/example.txt'")
seqpath = seqpath.lstrip(s3prefix)
pathsegs =seqpath.split('/')
bucket = pathsegs.pop(0)
key = pathsegs.join('/')
if seqpath[-3:] == ".gz":
suffix = '.' + sepath.split('.')[-2:].join('.')
suffix = os.path.splitext(seqpath)[1]
filepath = tempfile.NamedTemporaryFile(mode='w+b', suffix=suffix, delete=False)"Downloading '{0}' => '{1}'...".format(seqpath,
obj = s3.Object(bucket, key)
def __validate_fasta(self, seqpath):
Note: the function returns a NamedTemporaryFile if the input is an s3 object. This needs to be os.unlink'd
:param seqpath:
:returns: {lengths: {seqid: seqlen, ...}, filepath: 'path/to/example.fasta', temporary: False} Returns a dictionary of sequence objects, the filepath, and whether the
if type(seqpath) is not str:
raise TypeError("ngsci.reader.__validate_fasta expects a str 'seqpath' as its first positional argument")
elif (seqpath[-3:] != ".fa" and seqpath[-6:] != ".fa.gz" and seqpath[-6:] != ".fasta" and seqpath[-9:] != ".fasta.gz"):
raise TypeError("ngsci.reader.__validate_fasta expects the filepath to have a '.fa', '.fasta', or gzipped version of these format")
result = {
"lengths": [],
"filepath": seqpath,
"temporary": False
if seqpath[0:5] == "s3://":
localfasta = self.__s3_file_download(seqpath)
result["filepath"] = localfasta
result["temporary"] = True
elif not os.path.exists(result["filepath"]) or not os.access(result["filepath"], os.R_OK):
raise os.error("ngsci.reader.read_fasta expects the file to be accessible on the filesystem")
logger.debug("Reading reference sequence lengths from fasta file '{0}'...".format(result["filepath"]))
if seqpath[-3:] == ".gz":
with open(result["filepath"], 'rb') as ifile:
with, mode='rt') as data:
result["lengths"] = dict([(, len(x.seq)) for x in list(SeqIO.parse(data, 'fasta'))])
else: # Uncompressed fasta
with open(result["filepath"], 'r') as ifile:
result["lengths"] = dict([(, len(x.seq)) for x in list(SeqIO.parse(ifile, 'fasta'))])
except Exception as e:
return result
def __validate_alignment(self, seqpath, lengths):
if type(seqpath) is not str:
raise TypeError("ngsci.reader.__validate_alignment expects a str 'seqpath' as its first positional argument")
elif type(lengths) is not dict or not len(lengths) > 0:
raise TypeError("ngsci.reader.__validate_alignment expects a list of tuples as its second positional argument")
suffix = os.path.splitext(seqpath)[1]
if suffix not in alignment_suffixes:
raise TypeError("ngsci.reader.__validate_alignment expects the sequence file to have a '.sam', '.bam', or '.cram' suffix")
mode=dict(zip(alignment_suffixes, ['r', 'rb', 'rc']))[suffix]
result = {
"filepath": seqpath,
"sorted": None,
"indexed": None,
"temporary": False
if seqpath[0:5] == "s3://":
localsam = self.__s3_file_download(seqpath)
result["filepath"] = localsam
result["temporary"] = True
elif not os.path.exists(result["filepath"]) or not os.access(result["filepath"], os.R_OK):
raise os.error("ngsci.reader.__validate_alignment expects the file to be accessible on the filesystem")
logger.debug("Verifying index status of '{0}'...".format(result["filepath"]))
samfile = pysam.AlignmentFile(result["filepath"], mode)
if samfile.check_index():
result["sorted"] = result["filepath"]
result["indexed"] = result["filepath"]
except (ValueError, AttributeError) as e:
logger.debug("Alignnment file '{0}' is not indexed... sorting and indexing now".format(result["filepath"]))
sorted_file = tempfile.NamedTemporaryFile(suffix=suffix, mode='w+b', delete=False)
result["sorted"] = # The filepath attribute refers to the temporary sorted + indexed file
pysam.sort('-o', result["sorted"], result["filepath"]) # Sort the original filepath into the sorted file
if result["temporary"] is True:
os.unlink(result["filepath"]) # Delete the old temporary file
result["filepath"] = result["sorted"]
result["temporary"] = True
result["indexed"] = result["sorted"]
samseqs = dict(list(zip(samfile.references, samfile.lengths)))
logger.debug("Cross-referencing reference sequences/lengths with alignment file sequences and lengths...")
for k, l in samseqs.items():
if k not in lengths.keys():
raise AttributeError("Alignment file '{0}' references fasta sequence '{1}' which was not provided in the fasta file".format(result["indexed"], k))
elif l != lengths[k]:
raise AttributeError("Alignment file '{0}' with length {1} references a fasta sequence '{2}' with length {3}".format(result["indexed"], l, k, lengths[k]))
except AttributeError as e:
logger.error("Enncountered an error while sorting and indexing the alignment file '{0}'".format(result["filepath"]))
if result["temporary"] is True:
if result["filepath"] != seqpath:
sorted_alignment = result["sorted"]
indexed_alignment = result["indexed"]
if sorted_alignment is not None and os.path.exists(sorted_alignment) and sorted_alignment != seqpath:
logger.debug("Removing temporary sorted alignment file '{0}'...".format(sorted_alignment))
if indexed_alignment is not None and os.path.exists(indexed_alignment) and indexed_alignment != seqpath:
logger.debug("Removing temporary indexed alignment file '{0}'...".format(indexed_alignment))
return result
def read_blocks(self, reference, start, end):
"""This will read a chunk of reads as a pileup column. Usage as follows
for pileupcolumn in reader_instance.read_blocks(reference, 1, 200):
for pileupread in pileupcolumn.pileups:
if not pileupread.is_del and not pileupread.is_refskip:
# Logic for strand specificity and paired end
# do something with pileupread.alignment
# pileupread.alignment.reference_start
# pileupread.alignment.reference_end
# strand-specific
# :::: Filter criteria ::::
# pileupread.alignment.is_proper_pair
# :::: Conditionals
# pileupread.alignment.is_read1 and not is_reverse # FR pair, F is read one on forward strand
# pileupread.alignment.is_read1 and is_reverse # FR pair, F is read one on reverse strand
# unstranded
# :::: Filter criteria ::::
# :::: Conditionals
# pileup.alignment.reference_start
:param reference:
:param start:
:param end:
if type(reference) is not str:
raise TypeError("ngsci.reader.Reader expects a str 'reference' as its first positional argument")
elif type(start) is not int:
raise TypeError("ngsci.reader.Reader expects a int 'start' as its second positional argument")
elif type(end) is not int:
raise TypeError("ngsci.reader.Reader expects a int 'end' as its third positional argument")
elif reference not in self.reference_lengths.keys():
raise KeyError("ngsci.reader.Reader.read_blocks expects the reference argument to be present in the reference Fasta")
return self.alignment.pileup(reference, start, end)
