Skip to content

Instantly share code, notes, and snippets.

@lozybean
Last active March 27, 2017 07:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save lozybean/9bb99e48d9d155510383ac9533741113 to your computer and use it in GitHub Desktop.
Save lozybean/9bb99e48d9d155510383ac9533741113 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*- \#
"""
@author = 'liangzb'
@date = '2016/7/13 0013'
This script is for official clinvar.vcf file's
normalization and satisficing ANNOVAR format.
samtools is needed for reference read.
"""
import argparse
import re
import subprocess
from io import StringIO
from Bio import SeqIO
from vcf import Reader
def read_params():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-f', '--reference', dest='reference',
metavar='FILE', type=str, required=True,
help="reference sequence with samtools index")
parser.add_argument('-i', '--vcf_in', dest='vcf_in',
metavar='FILE', type=str, required=True,
help="input clinvar vcf file, which can be "
"downloaded from NCBI ftp site")
parser.add_argument('-n', '--normalized', dest='normalized',
metavar='FILE', type=str, default=None,
help="trans clinvar vcf file after normalization")
parser.add_argument('-a', '--annovar_formatted', dest='annovar_formatted',
metavar='FILE', type=str, default=None,
help="trans clinvar vcf file to satisfy ANNOVAR format")
args = parser.parse_args()
return args
def trans_b37_hg19(chrom):
if not chrom.startswith('chr'):
if chrom == 'MT':
return 'chrM'
else:
return 'chr' + chrom
return chrom
class RefGenome(object):
def __init__(self, fasta='/bio01/database/genome/hg19/ucsc.hg19.fasta'):
self.fasta = fasta
def base(self, chromosome, position):
return self.region(chromosome, position, position)
def region(self, chromosome, start, end):
command = ('samtools faidx {self.fasta} '
'{chromosome}:{start}-{end}'
.format_map(vars()))
result = subprocess.check_output(command, shell=True).decode('utf-8')
fp = StringIO(result)
for record in SeqIO.parse(fp, 'fasta'):
return str(record.seq).upper()
ref_genome = RefGenome()
class VariantRecord(object):
def __init__(self, chrom, pos, ref, alt,
id='.',
qual='.',
filter='.',
ref_genome=ref_genome,
infos=None,
style=None):
## trans b37 to hg19
chrom = trans_b37_hg19(chrom)
self.chrom = chrom
self.pos = int(pos)
self.ref = ref
self.alt = alt
self.ref_genome = ref_genome
self.id = id
self.qual = qual
self.filter = filter
self.style = style
if infos is None:
self.infos = {}
else:
self.infos = infos
def __str__(self):
infos = []
for key, value in self.infos.items():
if isinstance(value, bool):
infos.append('{key}'.format_map(vars()))
elif isinstance(value, list):
value = ','.join(v or '.' for v in value)
infos.append('{key}={value}'.format_map(vars()))
else:
if value is None:
value = '.'
infos.append('{key}={value}'.format_map(vars()))
infos = ';'.join(infos)
if not infos:
infos = '.'
return ('{self.chrom}\t{self.pos}\t{self.id}\t'
'{self.ref}\t{self.alt}\t{self.qual}\t'
'{self.filter}\t{infos}'
.format_map(vars()))
@staticmethod
def is_empty(allele):
if allele is None or allele == '-' or allele == '.' or allele == '':
return True
return False
def is_single(self):
if len(self.ref) == 1 or len(self.alt) == 1:
return True
return False
def fill_empty(self):
if self.is_empty(self.ref):
self.pos -= 1
prev_base = ref_genome.base(self.chrom, self.pos)
self.ref = prev_base
self.alt = prev_base + self.alt
if self.is_empty(self.alt):
self.pos -= 1
prev_base = ref_genome.base(self.chrom, self.pos)
self.ref = prev_base + self.ref
self.alt = prev_base
def left_alignment(self):
while self.alt[-1] == self.ref[-1]:
self.pos -= 1
prev_base = ref_genome.base(self.chrom, self.pos)
def slide(allele):
return prev_base + allele[:-1]
self.ref = slide(self.ref)
self.alt = slide(self.alt)
def parsimony(self, strand='left'):
if strand == 'right':
while self.ref[-1] == self.alt[-1] and not self.is_single():
self.ref = self.ref[:-1]
self.alt = self.alt[:-1]
if strand == 'left':
while self.ref[0] == self.alt[0] and not self.is_single():
self.ref = self.ref[1:]
self.alt = self.alt[1:]
self.pos += 1
def normalization(self):
if self.style not in ['snp', 'ref']:
self.fill_empty()
self.left_alignment()
self.parsimony()
class ClinvarRecord(object):
re_snp = re.compile('\w\.(\d+)(\w)>(\w)')
patten_position = '\w\.(\d+)(?:_)?(\d+)?'
re_mnp = re.compile(patten_position + 'del\w*ins(\w+)')
re_insertion = re.compile(patten_position + 'ins(\w+)')
re_deletion = re.compile(patten_position + 'del')
re_dup = re.compile(patten_position + 'dup')
re_ref = re.compile(patten_position + r'(\w+)\\x3d')
def __init__(self, chromosome, id, infos, ref_genome=ref_genome):
## trans b37 to hg19
chromosome = trans_b37_hg19(chromosome)
self.chromesome = chromosome
self.id = id
self.ref_genome = ref_genome
self.infos = infos
def get_variant_record(self, hgvs):
try:
if hgvs.find('>') > 0:
position, ref, alt = self.re_snp.search(hgvs).groups()
start = end = position
style = 'snp'
elif hgvs.find('del') > 0 and hgvs.find('ins') > 0:
start, end, alt = self.re_mnp.search(hgvs).groups()
ref = self.ref_genome.region(self.chromesome, start, end)
style = 'mnp'
elif hgvs.find('del') > 0:
start, end = self.re_deletion.search(hgvs).groups()
if end is None:
end = start
ref = self.ref_genome.region(self.chromesome, start, end)
alt = '-'
style = 'del'
elif hgvs.find('ins') > 0:
start, end, alt = self.re_insertion.search(hgvs).groups()
start = int(start) + 1
if end is None:
end = start
ref = '-'
style = 'ins'
elif hgvs.find('dup') > 0:
start, end = self.re_dup.search(hgvs).groups()
if end is None:
end = start
ref = '-'
alt = self.ref_genome.region(self.chromesome, start, end)
style = 'dup'
else:
start, end, ref = self.re_ref.search(hgvs).groups()
if end is None:
end = start
alt = ref
style = 'ref'
except AttributeError:
print(hgvs)
raise
return VariantRecord(self.chromesome, start, ref, alt,
id=self.id,
ref_genome=self.ref_genome,
style=style)
def variant_records(self):
"""
this method will split a clinvar multi record to normalized records
:return:
"""
for ind, clnhgvs in enumerate(self.infos['CLNHGVS']):
hgvs = clnhgvs.split(':')[1]
record = self.get_variant_record(hgvs)
record.normalization()
infos = {}
for key, value in self.infos.items():
if key == 'CLNALLE':
"""CLNALLE is no longer needed"""
continue
if key.startswith('CLN'):
infos[key] = value[ind]
else:
infos[key] = value
record.infos = infos
yield record
class ClinvarVCF(object):
def __init__(self, vcf_file, ref_genome=ref_genome):
self.vcf_file = vcf_file
self.ref_genome = ref_genome
def split_and_normalization(self, file=None):
"""
split and normalization clinvar vcf to vcf file
:return:
"""
def set_default(d, default='.'):
for key, value in d.items():
if value is None:
d[key] = default
def change_cln_num(d):
for key, value in d.items():
if key == 'id' and value.startswith('CLN'):
d['num'] = 1
with open(self.vcf_file, 'r') as fp:
vcf_reader = Reader(fp)
vcf_reader.metadata['reference'] = 'hg19'
for key, value in vcf_reader.metadata.items():
if isinstance(value, list):
value = ','.join(value)
print("##{key}={value}".format_map(vars()),
file=file)
for id, info in vcf_reader.infos.items():
info = info._asdict()
change_cln_num(info)
set_default(info)
if info['id'] == 'CLNALLE':
continue
print("##INFO=<ID={id},Number={num},Type={type},"
"Description={desc}".format_map(info),
file=file)
print("#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO",
file=file)
for record in vcf_reader:
record = ClinvarRecord(record.CHROM, record.ID,
infos=record.INFO, ref_genome=self.ref_genome)
for variant_record in record.variant_records():
print(variant_record, file=file)
def transfer_annovar_format(self, file=None):
def clnsig_transfer(clnsigs):
transfer = {
'0': 'Uncertain significance',
'1': 'not_provided',
'2': 'Benign',
'3': 'Likely benign',
'4': 'Likely pathogenic',
'5': 'Pathogenic',
'6': 'drug response',
'7': 'histocompatibility',
'255': 'other',
}
desc = []
for clnsig in clnsigs.split('|'):
desc.append(transfer[clnsig])
return '|'.join(desc)
with open(self.vcf_file, 'r') as fp:
vcf_reader = Reader(fp)
print("#Chr\tStart\tEnd\tRef\tAlt\tCLINSIG\tCLNDBN\tCLNACC\tCLNDSDB\tCLNDSDBID",
file=file)
for record in vcf_reader:
record = ClinvarRecord(record.CHROM, record.ID,
infos=record.INFO, ref_genome=self.ref_genome)
for variant_record in record.variant_records():
chrom = variant_record.chrom
start = variant_record.pos
max_allele_len = max(len(variant_record.ref), len(variant_record.alt))
end = variant_record.pos + max_allele_len - 1
ref = variant_record.ref
alt = variant_record.alt
clnsig = clnsig_transfer(variant_record.infos['CLNSIG'])
clndbn = variant_record.infos['CLNDBN']
clnacc = variant_record.infos['CLNACC']
clndsdb = variant_record.infos['CLNDSDB']
clndsdbid = variant_record.infos['CLNDSDBID']
print('{chrom}\t{start}\t{end}\t{ref}\t{alt}\t'
'{clnsig}\t{clndbn}\t{clnacc}\t{clndsdb}\t{clndsdbid}'
.format_map(vars()),
file=file)
if __name__ == '__main__':
params = read_params()
ref_genome = RefGenome(params.reference)
vcf_in = ClinvarVCF(params.vcf_in)
if params.normalized is not None:
with open(params.normalized, 'w') as fp:
vcf_in.split_and_normalization(file=fp)
if params.annovar_formatted is not None:
with open(params.annovar_formatted, 'w') as fp:
vcf_in.transfer_annovar_format(file=fp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment