Skip to content

Instantly share code, notes, and snippets.

@suqingdong
Last active October 17, 2016 15:09
Show Gist options
  • Save suqingdong/bd8c9a1ff5b50f661e1e589f68c5d138 to your computer and use it in GitHub Desktop.
Save suqingdong/bd8c9a1ff5b50f661e1e589f68c5d138 to your computer and use it in GitHub Desktop.
Merge multi snp annovar annotationed files
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Filename: merge_snp.py
# Date: 2016-09-23
# Author: suqingdong
class MergeSNP:
'''
Merge all the samples according to the second column(pos),
if the sample has no variation in ths position, mark "0" here.
'''
def __init__(self, file_list, output_filename='merged_snp.xls'):
self.file_list = file_list
self.output_filename = output_filename
def get_chrpos_dic(self, filename):
'''
Return a dict: key is chr+"\\t"+pos, value is sample infomation
'''
dic = {}
with open(filename) as f:
f.readline()
for eachline in f:
chr = eachline.split('\t')[0]
pos = eachline.split('\t')[1]
dic[chr + '\t' + pos] = eachline.strip().split('\t', 2)[-1]
return dic
@staticmethod
def sorted_rule(chrpos):
'''
Sort by chr and pos
'''
chr, pos = chrpos.split('\t')
if chr == 'X':
chr = 23
elif chr == 'Y':
chr = 24
return int(chr), int(pos)
def get_all_dic_and_all_pos(self, file_list):
'''
Get sorted all pos
'''
all_dic = [self.get_chrpos_dic(each) for each in file_list]
all_pos = reduce(set.union, [set(s.keys()) for s in all_dic])
return all_dic, sorted(all_pos, key=self.sorted_rule)
def get_header_and_samplenumber(self, file_list):
'''
Merge header and count each sample number
'''
header = 'CHROM\tPOS\t'
sample_number = []
for eachfile in file_list:
with open(eachfile) as f:
header += f.readline().rstrip().split('\t', 2)[-1]
n = len(f.readline().split('\t')) - 2
sample_number.append(n)
continue
header += '\n'
return header, sample_number
def main(self):
'''
Main function
'''
header, sample_number = self.get_header_and_samplenumber(self.file_list)
all_dic, sorted_pos = self.get_all_dic_and_all_pos(self.file_list)
with open(self.output_filename, 'w') as out:
out.write(header)
for chrpos in sorted_pos:
line = chrpos + '\t'
n = 0
for dic in all_dic:
if chrpos in dic:
line += dic[chrpos]
else:
line += '\t'.join('0' * sample_number[n])
n += 1
line += '\n'
out.write(line)
if __name__ == '__main__':
file_list = [
'Somatic_snp.annovar.hg19_multianno_only_E3.xls',
'Somatic_snp.annovar.hg19_multianno_only_E4.xls',
'Somatic_snp.annovar.hg19_multianno_only_CE.xls',
'Somatic_snp.annovar.hg19_multianno_only_CC.xls'
]
ms = MergeSNP(file_list)
ms.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment