Skip to content

Instantly share code, notes, and snippets.

@daTokenizer
Last active June 10, 2019 09:02
Show Gist options
  • Save daTokenizer/9ab022c0919b8ad093ce431703712e8f to your computer and use it in GitHub Desktop.
Save daTokenizer/9ab022c0919b8ad093ce431703712e8f to your computer and use it in GitHub Desktop.
VCF file Statistical compatison and analysis suite

VARCALLER Statistics Package

  • This directory contains tools and scripts for automated, local analysis and evaluation of varcaller change to a pipeline.
  • As time progresses, these tools should allow R&D departments add, remove, perform parameter searches and develop quality functions without needing their scientific department.

Usage

  • run GIAB son sample through the varcaller
  • get the bam.bed file from the mapper
  • get the current output vcf of the pipeline
  • minimize the varcaller vcf using prepare.sh
  • minimize the son NIST golden vcf using prepare.sh
  • minimize the current pipeline vcf using prepare.sh

##scripts

  1. prepare.sh - Intersects and prepares a given vcf outputted by a varcaller to fit quality standards (db > 4), coding regions, and NIST high confidence regions. usage: ./prepare.sh <mapper used>.bam.bed <file to minimize>.vcf
  2. analize.py - performs statistical analysis of one or more VCFs in comparison to a known Truth VCF. usage:
  • ./analize.py -i <new varcaller vcf> -c <current varcaller vcf> -t <truth vcf> to analyze a pair of VCFs in tandem and compare their results in a single command or
  • ./analize.py -i <new varcaller vcf> -t <truth vcf> to analise a single VCF without context.
  • ./analize.py -h is available for more options

results:

  • Example output:
loaded new variants
loaded current variants
created truth set
created new set
created new set
+-----------+-------+--------+-----------------+-------+-----------+-----------+----------+
| Varcaller |  hits | misses | found by others | noise | duplicate | precision |  recall  |
+-----------+-------+--------+-----------------+-------+-----------+-----------+----------+
|    new    | 48105 |  981   |        0        |  1335 |     0     |  97.2998% | 98.0015% |
|  current  | 46516 |  2570  |       1589      |  501  |     0     |  98.9344% | 94.7643% |
+-----------+-------+--------+-----------------+-------+-----------+-----------+----------+
  • all you need to know about the statistical results can be understood from this high production value graphic:
             +-----------------+
             |    varcaller    |
             +-----------------+
             |    T   |    F   |
 +-------+---+--------+--------+
 | Truth | T |  hits  | misses |
 +   -   +---+--------+--------+
 |  set  | F | noise  |        |
 +-------+---+--------+--------+
  • the found by others column denotes how many of one varcaller's misses were hits on the otehr vcf(s)
#! /bin/python
import argparse
import gzip
import hashlib
import sys
from contextlib import contextmanager
from prettytable import PrettyTable
############### STATISTICS ##################
def calc_statistical_results(data_dict):
# +-----------------+
# | varcaller |
# +-----------------+
# | T | F |
# +-------+---+--------+--------+
# | Truth | T | hits | misses |
# + - +---+--------+--------+
# | set | F | noise | |
# +-------+---+--------+--------+
hit_count = len(data_dict.get("hits", []))
noise_count = len(data_dict.get("noise", []))
miss_count = len(data_dict.get("misses", []))
statistics = {
"precision": hit_count / (hit_count + noise_count),
"recall": hit_count / (hit_count + miss_count),
}
return statistics
def compare_variant_lists(new_variants, truth_variant_set):
new_variant_set = set(new_variants)
new_variants_count = len(new_variant_set)
print("created new set")
hits = new_variant_set.intersection(truth_variant_set)
misses = truth_variant_set.difference(new_variant_set)
noise = new_variant_set.difference(truth_variant_set)
new_duplicates = len(new_variants) - new_variants_count
data_dict = {
"new_variants": new_variant_set,
"hits": hits,
"misses": misses,
"noise": noise,
"duplicates": new_duplicates,
}
statistics = calc_statistical_results(data_dict)
data_dict.update(statistics)
return data_dict
def percent(f):
return "{:.4%}".format(f)
def dict_to_report_row(varcaller_name, data_dict, varcaller_names):
varcaller_dict = data_dict[varcaller_name]
varcaller_misses = varcaller_dict['misses']
other_varcaller_hits = set()
for other_varcaller in varcaller_names:
if other_varcaller != varcaller_name:
other_varcaller_hits |= data_dict[other_varcaller].get("hits")
other_extra_hits = varcaller_misses.intersection(other_varcaller_hits)
varcaller_row = [
varcaller_name,
len(varcaller_dict['hits']),
len(varcaller_misses),
len(other_extra_hits),
len(varcaller_dict['noise']),
varcaller_dict['duplicates'],
percent(varcaller_dict['precision']),
percent(varcaller_dict['recall']),
]
return varcaller_row
def print_report(data_dict, varcaller_names):
if len(varcaller_names) == 1:
varcaller_name = varcaller_names[0]
varcaller_dict = data_dict[varcaller_name]
print(f"calculated hits: found {len(varcaller_dict['hits'])} variants")
print(f"calculated misses: found {len(varcaller_dict['misses'])} variants")
print(f"calculated noise: found {len(varcaller_dict['noise'])} variants")
print(f"calculated duplicates: found {varcaller_dict['duplicates']} variants")
print(f"calculated duplicates: found {varcaller_dict['duplicates']} variants")
print(f"precision: {varcaller_dict['precision']}")
print(f"recall {varcaller_dict['recall']}")
else:
report = PrettyTable()
report.field_names = [
"Varcaller",
"hits",
"misses",
"found by others",
"noise",
"duplicate",
"precision",
"recall",
]
for varcaller_name in varcaller_names:
varcaller_row = dict_to_report_row(varcaller_name, data_dict, varcaller_names)
report.add_row(varcaller_row)
print(report)
################# VAR ID ###################
def parts_to_var_id(chrom, pos, ref, alt, delim="_"):
chrom_pos = (delim.join(map(str, [chrom, pos]))).upper() # we can't trust var_fields items to be str
ref_alt = (delim.join(map(str, [ref, alt]))).upper() # we can't trust var_fields items to be str
md5_ref_alt = hashlib.md5(ref_alt.encode()).hexdigest()
md5_id = chrom_pos + delim + md5_ref_alt
return md5_id
def list_get(list, idx, default=""):
try:
return list[idx]
except IndexError:
return default
def get_variant_id(var_fields):
chrom = list_get(var_fields, 0, '')
if chrom.lower().startswith("chr"):
chrom = chrom[3:]
pos = list_get(var_fields, 1, '')
ref = list_get(var_fields, 3, '')
alt = list_get(var_fields, 4, '')
return parts_to_var_id(chrom, pos, ref, alt)
############### BED HANDLING ##################
def vcf_to_variants(vcf_file, bed_filter):
variants = []
with read_file(vcf_file) as vcf_reader:
for variant in vcf_reader:
if variant.startswith('#'):
continue
var_fields = get_fields(variant)
if bed_filter is None or bed_filter(var_fields):
var_id = get_variant_id(var_fields)
variants.append(var_id)
# yield var_id
return variants
def parse_bed_line(line):
chrom = list_get(line, 0, '')
if chrom.lower().startswith("chr"):
chrom = chrom[3:]
start_pos = list_get(line, 1, '')
end_pos = list_get(line, 3, '')
return chrom, start_pos, end_pos
def bed_tuple_to_filter(bed_tuple):
return lambda v: bed_tuple[1] < v[1] < bed_tuple[2] if v[0]==bed_tuple[0] else False
def bed_to_filter(bed_file):
filters = []
with read_file(bed_file) as bed_reader:
for line in bed_reader:
if line.startswith('#'):
continue
line_as_tuple = parse_bed_line(line)
filters.append(bed_tuple_to_filter(line_as_tuple))
return lambda var: any([fltr(var) for fltr in filters])
############### VCF HANDLING ##################
@contextmanager
def read_file(filename):
"""
:param filename: name of file to open. '-' for stdin/stdout
:return: file descriptor
"""
if filename == '-':
yield sys.stdin
else:
_open = gzip.open if filename.endswith('.gz') else open
with _open(filename, 'rt', encoding='utf-8') as f:
yield f
def get_fields(line):
if line.lstrip(" ").startswith("#"):
return None
fields = line.strip("\n").split('\t')
# CHROM POS ID REF ALT QUAL FILTER INFO FORMAT sample1
if len(fields) < 5:
raise Exception('Issue with line format.\n' + line + str(len(fields)))
return fields
def compare_vcfs(new_vcf, truth_vcf, current_vcf, bed_filter):
truth_variants = vcf_to_variants(truth_vcf, bed_filter)
print("loaded truth variants")
new_variants = vcf_to_variants(new_vcf, bed_filter)
print("loaded new variants")
if current_vcf:
current_variants = vcf_to_variants(current_vcf, bed_filter)
print("loaded current variants")
else:
current_variants = []
truth_variant_set = set(truth_variants)
print("created truth set")
data_dict = {"truth_variants_": truth_variant_set,
"new": compare_variant_lists(list(new_variants), truth_variant_set),
}
if current_vcf:
data_dict["current"] = compare_variant_lists(list(current_variants), truth_variant_set)
print_report(data_dict, ["new", "current"])
else:
print_report(data_dict, ["new"])
### CLI ###
def parse_args():
parser = argparse.ArgumentParser(
description="Take an varcaller vcf and absolute truth vcf as new, "
"and produce statistics regurding varcaller vcf"
)
parser.add_argument('-i', '--varcaller_vcf', dest='varcaller_vcf',
help='Input VCF(.gz) file from Varcaller', default=None)
parser.add_argument('-c', '--current_varcaller_vcf', dest='current_vcf',
help='Input VCF(.gz) file from a second Varcaller, to be compared with the first', default=None)
parser.add_argument('-t', '--truth_vcf', dest='truth_vcf',
help='VCF(.gz) file that represents absolute truth to be compared againts', default=None)
parser.add_argument('-b', '--bam', dest='bam_file', help='the BAM file that was provided to the Varcaller')
parser.add_argument('-r', '--regions-bed', dest='bed_file', help='BED file that was provided to the Varcaller')
run_args = parser.parse_args()
return run_args
def main(varcaller_vcf, truth_vcf, current_vcf, bam_file, bed_file):
bed_filter = bed_to_filter(bed_file) if bed_file else None
compare_vcfs(varcaller_vcf, truth_vcf, current_vcf, bed_filter)
if __name__ == '__main__':
args = parse_args()
if not args.varcaller_vcf:
print("missing varcaller new file")
elif not args.truth_vcf:
print("missing truth file")
else:
main(args.varcaller_vcf,
args.truth_vcf,
args.current_vcf,
args.bam_file,
args.bed_file)
#!/bin/sh
# usage: <this_script>.sh <mapper used>.bam.bed <file to minimize>.vcf
zcat $1 | grep -v "\s[1-4]$" > EMG_high_confidance_regions.bam.bed
bedtools merge -d 10 -i high_confidance_regions.bam.bed > high_confidance_regions.minimized.bed
bedtools intersect -a high_confidance_regions.minimized.bed -b coding_regions.bed > high_confidance_coding_regions.bed
cat HG002_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-22_v.3.3.2_highconf_noinconsistent.bed | sed s/^/chr/ > NIST_high_confidance_regions.aligned.bed
bedtools intersect -a NIST_high_confidance_regions.aligned.bed -b high_confidance_coding_regions.bed > NIST_aligned_highconf_coding_regions.bed
cat $2 | grep "#" > $2.minimized.vcf
bedtools intersect -wa -a $2 -b NIST_aligned_highconf_coding_regions.bed >> $2.minimized.vcf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment