Created
February 9, 2016 03:57
-
-
Save meren/abd5c6f014da9aa554a4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Questions and concerns: A. Murat Eren <meren@mbl.edu> | |
# | |
import sys | |
import hashlib | |
class FastaLibError(Exception): | |
def __init__(self, e = None): | |
Exception.__init__(self) | |
while 1: | |
if e.find(" ") > -1: | |
e = e.replace(" ", " ") | |
else: | |
break | |
self.e = e | |
return | |
def __str__(self): | |
return 'Fasta Lib Error: %s' % self.e | |
class SequenceSource(): | |
def __init__(self, fasta_file_path, lazy_init = True, unique = False, allow_mixed_case = False): | |
self.fasta_file_path = fasta_file_path | |
self.name = None | |
self.lazy_init = lazy_init | |
self.allow_mixed_case = allow_mixed_case | |
self.pos = 0 | |
self.id = None | |
self.seq = None | |
self.ids = [] | |
self.unique = unique | |
self.unique_hash_dict = {} | |
self.unique_hash_list = [] | |
self.unique_next_hash = 0 | |
self.file_pointer = open(self.fasta_file_path) | |
if not self.file_pointer.read(1) == '>': | |
raise FastaLibError, "File '%s' does not seem to be a FASTA file." % self.fasta_file_path | |
self.file_pointer.seek(0) | |
if self.lazy_init: | |
self.total_seq = None | |
else: | |
self.total_seq = len([l for l in self.file_pointer.readlines() if l.startswith('>')]) | |
self.reset() | |
if self.unique: | |
self.init_unique_hash() | |
def init_unique_hash(self): | |
while self.next_regular(): | |
hash = hashlib.sha1(self.seq.upper()).hexdigest() | |
if hash in self.unique_hash_dict: | |
self.unique_hash_dict[hash]['ids'].append(self.id) | |
self.unique_hash_dict[hash]['count'] += 1 | |
else: | |
self.unique_hash_dict[hash] = {'id' : self.id, | |
'ids': [self.id], | |
'seq': self.seq, | |
'count': 1} | |
self.unique_hash_list = [i[1] for i in sorted([(self.unique_hash_dict[hash]['count'], hash)\ | |
for hash in self.unique_hash_dict], reverse = True)] | |
self.total_unique = len(self.unique_hash_dict) | |
self.reset() | |
def next(self): | |
if self.unique: | |
return self.next_unique() | |
else: | |
return self.next_regular() | |
def next_unique(self): | |
if self.unique: | |
if self.total_unique > 0 and self.pos < self.total_unique: | |
hash_entry = self.unique_hash_dict[self.unique_hash_list[self.pos]] | |
self.pos += 1 | |
self.seq = hash_entry['seq'] if self.allow_mixed_case else hash_entry['seq'].upper() | |
self.id = hash_entry['id'] | |
self.ids = hash_entry['ids'] | |
return True | |
else: | |
return False | |
else: | |
return False | |
def next_regular(self): | |
self.seq = None | |
self.id = self.file_pointer.readline()[1:].strip() | |
sequence = '' | |
while 1: | |
line = self.file_pointer.readline() | |
if not line: | |
if len(sequence): | |
self.seq = sequence | |
self.pos += 1 | |
return True | |
else: | |
return False | |
if line.startswith('>'): | |
self.file_pointer.seek(self.file_pointer.tell() - len(line)) | |
break | |
sequence += line.strip() | |
self.seq = sequence if self.allow_mixed_case else sequence.upper() | |
self.pos += 1 | |
return True | |
def get_seq_by_read_id(self, read_id): | |
self.reset() | |
while self.next(): | |
if self.id == read_id: | |
return self.seq | |
return False | |
def close(self): | |
self.file_pointer.close() | |
def reset(self): | |
self.pos = 0 | |
self.id = None | |
self.seq = None | |
self.ids = [] | |
self.file_pointer.seek(0) | |
class FastaOutput: | |
def __init__(self, output_file_path): | |
self.output_file_path = output_file_path | |
self.output_file_obj = open(output_file_path, 'w') | |
def store(self, entry, split = True, store_frequencies = True): | |
if entry.unique and store_frequencies: | |
self.write_id('%s|%s' % (entry.id, 'frequency:%d' % len(entry.ids))) | |
else: | |
self.write_id(entry.id) | |
self.write_seq(entry.seq, split) | |
def write_id(self, id): | |
self.output_file_obj.write('>%s\n' % id) | |
def write_seq(self, seq, split = True): | |
if split: | |
seq = self.split(seq) | |
self.output_file_obj.write('%s\n' % seq) | |
def split(self, sequence, piece_length = 80): | |
ticks = range(0, len(sequence), piece_length) + [len(sequence)] | |
return '\n'.join([sequence[ticks[x]:ticks[x + 1]] for x in range(0, len(ticks) - 1)]) | |
def close(self): | |
self.output_file_obj.close() | |
def main(args): | |
try: | |
input = SequenceSource(args.input_fasta, unique = True) | |
except IOError: | |
print 'Error: File does not exist, or you do not have the right permissions to read it: "%s"'\ | |
% args.input_fasta | |
sys.exit(-1) | |
if args.output_fasta: | |
output_file_path = args.output_fasta | |
else: | |
output_file_path = args.input_fasta + '.unique' | |
try: | |
output = FastaOutput(output_file_path) | |
except IOError: | |
print 'Error: You have no permission to write destination: "%s"'\ | |
% output_file_path | |
sys.exit(-1) | |
if args.names_file: | |
names_file_path = args.names_file | |
else: | |
names_file_path = output_file_path + '.names' | |
try: | |
names = open(names_file_path, 'w') | |
except IOError: | |
print 'Error: You have no permission to write destination: "%s"'\ | |
% names_file_path | |
sys.exit(-1) | |
while input.next(): | |
if args.dont_truncate_sequences: | |
if args.dont_include_frequencies: | |
output.store(input, split = False, store_frequencies = False) | |
else: | |
output.store(input, split = False) | |
else: | |
if args.dont_include_frequencies: | |
output.store(input, store_frequencies = False) | |
else: | |
output.store(input) | |
names.write('%s\t%s\n' % (input.id, ','.join(input.ids))) | |
output.close() | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description='Unique sequences in a FASTA file') | |
parser.add_argument('input_fasta', metavar = 'INPUT_FASTA', | |
help = 'Sequences file in FASTA format') | |
parser.add_argument('-o', '--output-fasta', metavar = 'OUTPUT_FASTA', default = None, | |
help = 'FASTA file to store unique sequences') | |
parser.add_argument('-n', '--names-file', metavar = 'NAMES_FILE', default = None, | |
help = 'FASTA fie to store unique sequences') | |
parser.add_argument('-t', '--dont-truncate-sequences', action = 'store_true', default = False, | |
help = 'When present, sequences would not be truncated into multiple lines when creating FASTA') | |
parser.add_argument('-f', '--dont-include-frequencies', action = 'store_true', default = False, | |
help = 'When present, sequence frequencies would not appear in the defline') | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment