Created
November 19, 2019 04:48
-
-
Save suqingdong/3654b71d62d97507bda1049c7f916646 to your computer and use it in GitHub Desktop.
mutations statistics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- encoding: utf8 -*- | |
""" | |
mutation statistics for each sample | |
""" | |
import os | |
import re | |
import time | |
from functools import partial | |
from collections import defaultdict | |
import prettytable | |
__author__ = 'suqingdong' | |
__author_email__ = 'suqingdong@novogene.com' | |
def safe_open(filename, mode='r'): | |
if 'w' in mode: | |
dirname = os.path.dirname(filename) | |
if dirname and not os.path.exists(dirname): | |
os.makedirs(dirname) | |
if filename.endswith('.gz'): | |
import gzip | |
return gzip.open(filename, mode) | |
return open(filename, mode) | |
def is_mutation(not_mut_pattern, genotype): | |
if not_mut_pattern.match(genotype): | |
return False | |
return True | |
def show_stats(mut_stats): | |
print('mutation statistics:') | |
table = prettytable.PrettyTable(['Sample', 'Count', 'Percent']) | |
mut_stats_sort = sorted(mut_stats.iteritems(), | |
key=lambda (x, y): len(x.split(',')), | |
reverse=False) | |
total = sum(mut_stats.itervalues()) | |
for sample, count in mut_stats_sort: | |
if len(sample.split(',')) == 1: | |
sample += ' only' | |
else: | |
sample += ' share' | |
percent = '{:.2f}%'.format(float(count) * 100 / total) | |
table.add_row([sample, count, percent]) | |
# set style | |
table.align['sample'] = 'l' | |
# custom footer | |
table.add_row(str(table).split('\n', 1)[0].split('+')[1:4]) | |
table.add_row(['Total', total, '100%']) | |
print('\033[3;36m{}\033[0m'.format(table)) | |
def mutation_stats(infile, outfile): | |
mut_stats = defaultdict(int) | |
with safe_open(infile) as f, safe_open(outfile, 'w') as out: | |
not_mut_pattern = re.compile(r'^(0/0)|(\./\.):') | |
for line in f: | |
linelist = line.strip().split('\t') | |
if linelist[0] in ('CHROM', 'Priority'): | |
headerlist = linelist | |
start = headerlist.index('FORMAT') + 1 | |
end = headerlist.index('Ori_REF') | |
samples = headerlist[start:end] | |
print('samples({}): {}'.format(len(samples), samples)) | |
out_header = '\t'.join(headerlist + ['Mut_Samples']) + '\n' | |
out.write(out_header) | |
continue | |
genotypes = linelist[start:end] | |
mutations = map(partial(is_mutation, not_mut_pattern), genotypes) # [True, False, ...] | |
mut_samples = ','.join(s for s, m in zip(samples, mutations) if m) | |
if not mut_samples: | |
continue | |
mut_stats[mut_samples] += 1 | |
out_line = '\t'.join(linelist + [mut_samples]) + '\n' | |
out.write(out_line) | |
print('outfile: {}'.format(outfile)) | |
return mut_stats | |
def main(): | |
start_time = time.time() | |
infile = args['infile'] | |
outfile = args['outfile'] | |
mut_stats = mutation_stats(infile, outfile) | |
show_stats(mut_stats) | |
print('time used: {:.2f}s'.format(time.time() - start_time)) | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser( | |
prog='mutation_stats', | |
description=__doc__, | |
formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument('infile', help='the input annovar annotated file', nargs='?') | |
parser.add_argument( | |
'-o', '--outfile', help='the output filename[%(default)s]', default='out.xls') | |
args = vars(parser.parse_args()) | |
if not args['infile']: | |
parser.print_help() | |
exit() | |
main() |
Author
suqingdong
commented
Nov 19, 2019
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment