Last active
October 17, 2016 15:09
-
-
Save suqingdong/bd8c9a1ff5b50f661e1e589f68c5d138 to your computer and use it in GitHub Desktop.
Merge multi snp annovar annotationed files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Filename: merge_snp.py | |
# Date: 2016-09-23 | |
# Author: suqingdong | |
class MergeSNP: | |
''' | |
Merge all the samples according to the second column(pos), | |
if the sample has no variation in ths position, mark "0" here. | |
''' | |
def __init__(self, file_list, output_filename='merged_snp.xls'): | |
self.file_list = file_list | |
self.output_filename = output_filename | |
def get_chrpos_dic(self, filename): | |
''' | |
Return a dict: key is chr+"\\t"+pos, value is sample infomation | |
''' | |
dic = {} | |
with open(filename) as f: | |
f.readline() | |
for eachline in f: | |
chr = eachline.split('\t')[0] | |
pos = eachline.split('\t')[1] | |
dic[chr + '\t' + pos] = eachline.strip().split('\t', 2)[-1] | |
return dic | |
@staticmethod | |
def sorted_rule(chrpos): | |
''' | |
Sort by chr and pos | |
''' | |
chr, pos = chrpos.split('\t') | |
if chr == 'X': | |
chr = 23 | |
elif chr == 'Y': | |
chr = 24 | |
return int(chr), int(pos) | |
def get_all_dic_and_all_pos(self, file_list): | |
''' | |
Get sorted all pos | |
''' | |
all_dic = [self.get_chrpos_dic(each) for each in file_list] | |
all_pos = reduce(set.union, [set(s.keys()) for s in all_dic]) | |
return all_dic, sorted(all_pos, key=self.sorted_rule) | |
def get_header_and_samplenumber(self, file_list): | |
''' | |
Merge header and count each sample number | |
''' | |
header = 'CHROM\tPOS\t' | |
sample_number = [] | |
for eachfile in file_list: | |
with open(eachfile) as f: | |
header += f.readline().rstrip().split('\t', 2)[-1] | |
n = len(f.readline().split('\t')) - 2 | |
sample_number.append(n) | |
continue | |
header += '\n' | |
return header, sample_number | |
def main(self): | |
''' | |
Main function | |
''' | |
header, sample_number = self.get_header_and_samplenumber(self.file_list) | |
all_dic, sorted_pos = self.get_all_dic_and_all_pos(self.file_list) | |
with open(self.output_filename, 'w') as out: | |
out.write(header) | |
for chrpos in sorted_pos: | |
line = chrpos + '\t' | |
n = 0 | |
for dic in all_dic: | |
if chrpos in dic: | |
line += dic[chrpos] | |
else: | |
line += '\t'.join('0' * sample_number[n]) | |
n += 1 | |
line += '\n' | |
out.write(line) | |
if __name__ == '__main__': | |
file_list = [ | |
'Somatic_snp.annovar.hg19_multianno_only_E3.xls', | |
'Somatic_snp.annovar.hg19_multianno_only_E4.xls', | |
'Somatic_snp.annovar.hg19_multianno_only_CE.xls', | |
'Somatic_snp.annovar.hg19_multianno_only_CC.xls' | |
] | |
ms = MergeSNP(file_list) | |
ms.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment