Skip to content

Instantly share code, notes, and snippets.

@suqingdong
Created November 19, 2019 04:48
Show Gist options
  • Save suqingdong/3654b71d62d97507bda1049c7f916646 to your computer and use it in GitHub Desktop.
Save suqingdong/3654b71d62d97507bda1049c7f916646 to your computer and use it in GitHub Desktop.
mutations statistics
#!/usr/bin/env python
# -*- encoding: utf8 -*-
"""
mutation statistics for each sample
"""
import os
import re
import time
from functools import partial
from collections import defaultdict
import prettytable
__author__ = 'suqingdong'
__author_email__ = 'suqingdong@novogene.com'
def safe_open(filename, mode='r'):
if 'w' in mode:
dirname = os.path.dirname(filename)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
if filename.endswith('.gz'):
import gzip
return gzip.open(filename, mode)
return open(filename, mode)
def is_mutation(not_mut_pattern, genotype):
if not_mut_pattern.match(genotype):
return False
return True
def show_stats(mut_stats):
print('mutation statistics:')
table = prettytable.PrettyTable(['Sample', 'Count', 'Percent'])
mut_stats_sort = sorted(mut_stats.iteritems(),
key=lambda (x, y): len(x.split(',')),
reverse=False)
total = sum(mut_stats.itervalues())
for sample, count in mut_stats_sort:
if len(sample.split(',')) == 1:
sample += ' only'
else:
sample += ' share'
percent = '{:.2f}%'.format(float(count) * 100 / total)
table.add_row([sample, count, percent])
# set style
table.align['sample'] = 'l'
# custom footer
table.add_row(str(table).split('\n', 1)[0].split('+')[1:4])
table.add_row(['Total', total, '100%'])
print('\033[3;36m{}\033[0m'.format(table))
def mutation_stats(infile, outfile):
mut_stats = defaultdict(int)
with safe_open(infile) as f, safe_open(outfile, 'w') as out:
not_mut_pattern = re.compile(r'^(0/0)|(\./\.):')
for line in f:
linelist = line.strip().split('\t')
if linelist[0] in ('CHROM', 'Priority'):
headerlist = linelist
start = headerlist.index('FORMAT') + 1
end = headerlist.index('Ori_REF')
samples = headerlist[start:end]
print('samples({}): {}'.format(len(samples), samples))
out_header = '\t'.join(headerlist + ['Mut_Samples']) + '\n'
out.write(out_header)
continue
genotypes = linelist[start:end]
mutations = map(partial(is_mutation, not_mut_pattern), genotypes) # [True, False, ...]
mut_samples = ','.join(s for s, m in zip(samples, mutations) if m)
if not mut_samples:
continue
mut_stats[mut_samples] += 1
out_line = '\t'.join(linelist + [mut_samples]) + '\n'
out.write(out_line)
print('outfile: {}'.format(outfile))
return mut_stats
def main():
start_time = time.time()
infile = args['infile']
outfile = args['outfile']
mut_stats = mutation_stats(infile, outfile)
show_stats(mut_stats)
print('time used: {:.2f}s'.format(time.time() - start_time))
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
prog='mutation_stats',
description=__doc__,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('infile', help='the input annovar annotated file', nargs='?')
parser.add_argument(
'-o', '--outfile', help='the output filename[%(default)s]', default='out.xls')
args = vars(parser.parse_args())
if not args['infile']:
parser.print_help()
exit()
main()
@suqingdong
Copy link
Author

image
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment