Last active
March 27, 2017 07:50
-
-
Save lozybean/9bb99e48d9d155510383ac9533741113 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- \# | |
""" | |
@author = 'liangzb' | |
@date = '2016/7/13 0013' | |
This script is for official clinvar.vcf file's | |
normalization and satisficing ANNOVAR format. | |
samtools is needed for reference read. | |
""" | |
import argparse | |
import re | |
import subprocess | |
from io import StringIO | |
from Bio import SeqIO | |
from vcf import Reader | |
def read_params(): | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument('-f', '--reference', dest='reference', | |
metavar='FILE', type=str, required=True, | |
help="reference sequence with samtools index") | |
parser.add_argument('-i', '--vcf_in', dest='vcf_in', | |
metavar='FILE', type=str, required=True, | |
help="input clinvar vcf file, which can be " | |
"downloaded from NCBI ftp site") | |
parser.add_argument('-n', '--normalized', dest='normalized', | |
metavar='FILE', type=str, default=None, | |
help="trans clinvar vcf file after normalization") | |
parser.add_argument('-a', '--annovar_formatted', dest='annovar_formatted', | |
metavar='FILE', type=str, default=None, | |
help="trans clinvar vcf file to satisfy ANNOVAR format") | |
args = parser.parse_args() | |
return args | |
def trans_b37_hg19(chrom): | |
if not chrom.startswith('chr'): | |
if chrom == 'MT': | |
return 'chrM' | |
else: | |
return 'chr' + chrom | |
return chrom | |
class RefGenome(object): | |
def __init__(self, fasta='/bio01/database/genome/hg19/ucsc.hg19.fasta'): | |
self.fasta = fasta | |
def base(self, chromosome, position): | |
return self.region(chromosome, position, position) | |
def region(self, chromosome, start, end): | |
command = ('samtools faidx {self.fasta} ' | |
'{chromosome}:{start}-{end}' | |
.format_map(vars())) | |
result = subprocess.check_output(command, shell=True).decode('utf-8') | |
fp = StringIO(result) | |
for record in SeqIO.parse(fp, 'fasta'): | |
return str(record.seq).upper() | |
ref_genome = RefGenome() | |
class VariantRecord(object): | |
def __init__(self, chrom, pos, ref, alt, | |
id='.', | |
qual='.', | |
filter='.', | |
ref_genome=ref_genome, | |
infos=None, | |
style=None): | |
## trans b37 to hg19 | |
chrom = trans_b37_hg19(chrom) | |
self.chrom = chrom | |
self.pos = int(pos) | |
self.ref = ref | |
self.alt = alt | |
self.ref_genome = ref_genome | |
self.id = id | |
self.qual = qual | |
self.filter = filter | |
self.style = style | |
if infos is None: | |
self.infos = {} | |
else: | |
self.infos = infos | |
def __str__(self): | |
infos = [] | |
for key, value in self.infos.items(): | |
if isinstance(value, bool): | |
infos.append('{key}'.format_map(vars())) | |
elif isinstance(value, list): | |
value = ','.join(v or '.' for v in value) | |
infos.append('{key}={value}'.format_map(vars())) | |
else: | |
if value is None: | |
value = '.' | |
infos.append('{key}={value}'.format_map(vars())) | |
infos = ';'.join(infos) | |
if not infos: | |
infos = '.' | |
return ('{self.chrom}\t{self.pos}\t{self.id}\t' | |
'{self.ref}\t{self.alt}\t{self.qual}\t' | |
'{self.filter}\t{infos}' | |
.format_map(vars())) | |
@staticmethod | |
def is_empty(allele): | |
if allele is None or allele == '-' or allele == '.' or allele == '': | |
return True | |
return False | |
def is_single(self): | |
if len(self.ref) == 1 or len(self.alt) == 1: | |
return True | |
return False | |
def fill_empty(self): | |
if self.is_empty(self.ref): | |
self.pos -= 1 | |
prev_base = ref_genome.base(self.chrom, self.pos) | |
self.ref = prev_base | |
self.alt = prev_base + self.alt | |
if self.is_empty(self.alt): | |
self.pos -= 1 | |
prev_base = ref_genome.base(self.chrom, self.pos) | |
self.ref = prev_base + self.ref | |
self.alt = prev_base | |
def left_alignment(self): | |
while self.alt[-1] == self.ref[-1]: | |
self.pos -= 1 | |
prev_base = ref_genome.base(self.chrom, self.pos) | |
def slide(allele): | |
return prev_base + allele[:-1] | |
self.ref = slide(self.ref) | |
self.alt = slide(self.alt) | |
def parsimony(self, strand='left'): | |
if strand == 'right': | |
while self.ref[-1] == self.alt[-1] and not self.is_single(): | |
self.ref = self.ref[:-1] | |
self.alt = self.alt[:-1] | |
if strand == 'left': | |
while self.ref[0] == self.alt[0] and not self.is_single(): | |
self.ref = self.ref[1:] | |
self.alt = self.alt[1:] | |
self.pos += 1 | |
def normalization(self): | |
if self.style not in ['snp', 'ref']: | |
self.fill_empty() | |
self.left_alignment() | |
self.parsimony() | |
class ClinvarRecord(object): | |
re_snp = re.compile('\w\.(\d+)(\w)>(\w)') | |
patten_position = '\w\.(\d+)(?:_)?(\d+)?' | |
re_mnp = re.compile(patten_position + 'del\w*ins(\w+)') | |
re_insertion = re.compile(patten_position + 'ins(\w+)') | |
re_deletion = re.compile(patten_position + 'del') | |
re_dup = re.compile(patten_position + 'dup') | |
re_ref = re.compile(patten_position + r'(\w+)\\x3d') | |
def __init__(self, chromosome, id, infos, ref_genome=ref_genome): | |
## trans b37 to hg19 | |
chromosome = trans_b37_hg19(chromosome) | |
self.chromesome = chromosome | |
self.id = id | |
self.ref_genome = ref_genome | |
self.infos = infos | |
def get_variant_record(self, hgvs): | |
try: | |
if hgvs.find('>') > 0: | |
position, ref, alt = self.re_snp.search(hgvs).groups() | |
start = end = position | |
style = 'snp' | |
elif hgvs.find('del') > 0 and hgvs.find('ins') > 0: | |
start, end, alt = self.re_mnp.search(hgvs).groups() | |
ref = self.ref_genome.region(self.chromesome, start, end) | |
style = 'mnp' | |
elif hgvs.find('del') > 0: | |
start, end = self.re_deletion.search(hgvs).groups() | |
if end is None: | |
end = start | |
ref = self.ref_genome.region(self.chromesome, start, end) | |
alt = '-' | |
style = 'del' | |
elif hgvs.find('ins') > 0: | |
start, end, alt = self.re_insertion.search(hgvs).groups() | |
start = int(start) + 1 | |
if end is None: | |
end = start | |
ref = '-' | |
style = 'ins' | |
elif hgvs.find('dup') > 0: | |
start, end = self.re_dup.search(hgvs).groups() | |
if end is None: | |
end = start | |
ref = '-' | |
alt = self.ref_genome.region(self.chromesome, start, end) | |
style = 'dup' | |
else: | |
start, end, ref = self.re_ref.search(hgvs).groups() | |
if end is None: | |
end = start | |
alt = ref | |
style = 'ref' | |
except AttributeError: | |
print(hgvs) | |
raise | |
return VariantRecord(self.chromesome, start, ref, alt, | |
id=self.id, | |
ref_genome=self.ref_genome, | |
style=style) | |
def variant_records(self): | |
""" | |
this method will split a clinvar multi record to normalized records | |
:return: | |
""" | |
for ind, clnhgvs in enumerate(self.infos['CLNHGVS']): | |
hgvs = clnhgvs.split(':')[1] | |
record = self.get_variant_record(hgvs) | |
record.normalization() | |
infos = {} | |
for key, value in self.infos.items(): | |
if key == 'CLNALLE': | |
"""CLNALLE is no longer needed""" | |
continue | |
if key.startswith('CLN'): | |
infos[key] = value[ind] | |
else: | |
infos[key] = value | |
record.infos = infos | |
yield record | |
class ClinvarVCF(object): | |
def __init__(self, vcf_file, ref_genome=ref_genome): | |
self.vcf_file = vcf_file | |
self.ref_genome = ref_genome | |
def split_and_normalization(self, file=None): | |
""" | |
split and normalization clinvar vcf to vcf file | |
:return: | |
""" | |
def set_default(d, default='.'): | |
for key, value in d.items(): | |
if value is None: | |
d[key] = default | |
def change_cln_num(d): | |
for key, value in d.items(): | |
if key == 'id' and value.startswith('CLN'): | |
d['num'] = 1 | |
with open(self.vcf_file, 'r') as fp: | |
vcf_reader = Reader(fp) | |
vcf_reader.metadata['reference'] = 'hg19' | |
for key, value in vcf_reader.metadata.items(): | |
if isinstance(value, list): | |
value = ','.join(value) | |
print("##{key}={value}".format_map(vars()), | |
file=file) | |
for id, info in vcf_reader.infos.items(): | |
info = info._asdict() | |
change_cln_num(info) | |
set_default(info) | |
if info['id'] == 'CLNALLE': | |
continue | |
print("##INFO=<ID={id},Number={num},Type={type}," | |
"Description={desc}".format_map(info), | |
file=file) | |
print("#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO", | |
file=file) | |
for record in vcf_reader: | |
record = ClinvarRecord(record.CHROM, record.ID, | |
infos=record.INFO, ref_genome=self.ref_genome) | |
for variant_record in record.variant_records(): | |
print(variant_record, file=file) | |
def transfer_annovar_format(self, file=None): | |
def clnsig_transfer(clnsigs): | |
transfer = { | |
'0': 'Uncertain significance', | |
'1': 'not_provided', | |
'2': 'Benign', | |
'3': 'Likely benign', | |
'4': 'Likely pathogenic', | |
'5': 'Pathogenic', | |
'6': 'drug response', | |
'7': 'histocompatibility', | |
'255': 'other', | |
} | |
desc = [] | |
for clnsig in clnsigs.split('|'): | |
desc.append(transfer[clnsig]) | |
return '|'.join(desc) | |
with open(self.vcf_file, 'r') as fp: | |
vcf_reader = Reader(fp) | |
print("#Chr\tStart\tEnd\tRef\tAlt\tCLINSIG\tCLNDBN\tCLNACC\tCLNDSDB\tCLNDSDBID", | |
file=file) | |
for record in vcf_reader: | |
record = ClinvarRecord(record.CHROM, record.ID, | |
infos=record.INFO, ref_genome=self.ref_genome) | |
for variant_record in record.variant_records(): | |
chrom = variant_record.chrom | |
start = variant_record.pos | |
max_allele_len = max(len(variant_record.ref), len(variant_record.alt)) | |
end = variant_record.pos + max_allele_len - 1 | |
ref = variant_record.ref | |
alt = variant_record.alt | |
clnsig = clnsig_transfer(variant_record.infos['CLNSIG']) | |
clndbn = variant_record.infos['CLNDBN'] | |
clnacc = variant_record.infos['CLNACC'] | |
clndsdb = variant_record.infos['CLNDSDB'] | |
clndsdbid = variant_record.infos['CLNDSDBID'] | |
print('{chrom}\t{start}\t{end}\t{ref}\t{alt}\t' | |
'{clnsig}\t{clndbn}\t{clnacc}\t{clndsdb}\t{clndsdbid}' | |
.format_map(vars()), | |
file=file) | |
if __name__ == '__main__': | |
params = read_params() | |
ref_genome = RefGenome(params.reference) | |
vcf_in = ClinvarVCF(params.vcf_in) | |
if params.normalized is not None: | |
with open(params.normalized, 'w') as fp: | |
vcf_in.split_and_normalization(file=fp) | |
if params.annovar_formatted is not None: | |
with open(params.annovar_formatted, 'w') as fp: | |
vcf_in.transfer_annovar_format(file=fp) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment