Skip to content

Instantly share code, notes, and snippets.

@meren meren/fastaunique
Created Feb 9, 2016

Embed
What would you like to do?
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Questions and concerns: A. Murat Eren <meren@mbl.edu>
#
import sys
import hashlib
class FastaLibError(Exception):
def __init__(self, e = None):
Exception.__init__(self)
while 1:
if e.find(" ") > -1:
e = e.replace(" ", " ")
else:
break
self.e = e
return
def __str__(self):
return 'Fasta Lib Error: %s' % self.e
class SequenceSource():
def __init__(self, fasta_file_path, lazy_init = True, unique = False, allow_mixed_case = False):
self.fasta_file_path = fasta_file_path
self.name = None
self.lazy_init = lazy_init
self.allow_mixed_case = allow_mixed_case
self.pos = 0
self.id = None
self.seq = None
self.ids = []
self.unique = unique
self.unique_hash_dict = {}
self.unique_hash_list = []
self.unique_next_hash = 0
self.file_pointer = open(self.fasta_file_path)
if not self.file_pointer.read(1) == '>':
raise FastaLibError, "File '%s' does not seem to be a FASTA file." % self.fasta_file_path
self.file_pointer.seek(0)
if self.lazy_init:
self.total_seq = None
else:
self.total_seq = len([l for l in self.file_pointer.readlines() if l.startswith('>')])
self.reset()
if self.unique:
self.init_unique_hash()
def init_unique_hash(self):
while self.next_regular():
hash = hashlib.sha1(self.seq.upper()).hexdigest()
if hash in self.unique_hash_dict:
self.unique_hash_dict[hash]['ids'].append(self.id)
self.unique_hash_dict[hash]['count'] += 1
else:
self.unique_hash_dict[hash] = {'id' : self.id,
'ids': [self.id],
'seq': self.seq,
'count': 1}
self.unique_hash_list = [i[1] for i in sorted([(self.unique_hash_dict[hash]['count'], hash)\
for hash in self.unique_hash_dict], reverse = True)]
self.total_unique = len(self.unique_hash_dict)
self.reset()
def next(self):
if self.unique:
return self.next_unique()
else:
return self.next_regular()
def next_unique(self):
if self.unique:
if self.total_unique > 0 and self.pos < self.total_unique:
hash_entry = self.unique_hash_dict[self.unique_hash_list[self.pos]]
self.pos += 1
self.seq = hash_entry['seq'] if self.allow_mixed_case else hash_entry['seq'].upper()
self.id = hash_entry['id']
self.ids = hash_entry['ids']
return True
else:
return False
else:
return False
def next_regular(self):
self.seq = None
self.id = self.file_pointer.readline()[1:].strip()
sequence = ''
while 1:
line = self.file_pointer.readline()
if not line:
if len(sequence):
self.seq = sequence
self.pos += 1
return True
else:
return False
if line.startswith('>'):
self.file_pointer.seek(self.file_pointer.tell() - len(line))
break
sequence += line.strip()
self.seq = sequence if self.allow_mixed_case else sequence.upper()
self.pos += 1
return True
def get_seq_by_read_id(self, read_id):
self.reset()
while self.next():
if self.id == read_id:
return self.seq
return False
def close(self):
self.file_pointer.close()
def reset(self):
self.pos = 0
self.id = None
self.seq = None
self.ids = []
self.file_pointer.seek(0)
class FastaOutput:
def __init__(self, output_file_path):
self.output_file_path = output_file_path
self.output_file_obj = open(output_file_path, 'w')
def store(self, entry, split = True, store_frequencies = True):
if entry.unique and store_frequencies:
self.write_id('%s|%s' % (entry.id, 'frequency:%d' % len(entry.ids)))
else:
self.write_id(entry.id)
self.write_seq(entry.seq, split)
def write_id(self, id):
self.output_file_obj.write('>%s\n' % id)
def write_seq(self, seq, split = True):
if split:
seq = self.split(seq)
self.output_file_obj.write('%s\n' % seq)
def split(self, sequence, piece_length = 80):
ticks = range(0, len(sequence), piece_length) + [len(sequence)]
return '\n'.join([sequence[ticks[x]:ticks[x + 1]] for x in range(0, len(ticks) - 1)])
def close(self):
self.output_file_obj.close()
def main(args):
try:
input = SequenceSource(args.input_fasta, unique = True)
except IOError:
print 'Error: File does not exist, or you do not have the right permissions to read it: "%s"'\
% args.input_fasta
sys.exit(-1)
if args.output_fasta:
output_file_path = args.output_fasta
else:
output_file_path = args.input_fasta + '.unique'
try:
output = FastaOutput(output_file_path)
except IOError:
print 'Error: You have no permission to write destination: "%s"'\
% output_file_path
sys.exit(-1)
if args.names_file:
names_file_path = args.names_file
else:
names_file_path = output_file_path + '.names'
try:
names = open(names_file_path, 'w')
except IOError:
print 'Error: You have no permission to write destination: "%s"'\
% names_file_path
sys.exit(-1)
while input.next():
if args.dont_truncate_sequences:
if args.dont_include_frequencies:
output.store(input, split = False, store_frequencies = False)
else:
output.store(input, split = False)
else:
if args.dont_include_frequencies:
output.store(input, store_frequencies = False)
else:
output.store(input)
names.write('%s\t%s\n' % (input.id, ','.join(input.ids)))
output.close()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Unique sequences in a FASTA file')
parser.add_argument('input_fasta', metavar = 'INPUT_FASTA',
help = 'Sequences file in FASTA format')
parser.add_argument('-o', '--output-fasta', metavar = 'OUTPUT_FASTA', default = None,
help = 'FASTA file to store unique sequences')
parser.add_argument('-n', '--names-file', metavar = 'NAMES_FILE', default = None,
help = 'FASTA fie to store unique sequences')
parser.add_argument('-t', '--dont-truncate-sequences', action = 'store_true', default = False,
help = 'When present, sequences would not be truncated into multiple lines when creating FASTA')
parser.add_argument('-f', '--dont-include-frequencies', action = 'store_true', default = False,
help = 'When present, sequence frequencies would not appear in the defline')
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.