|
#! /bin/python |
|
import argparse |
|
import gzip |
|
import hashlib |
|
import sys |
|
from contextlib import contextmanager |
|
from prettytable import PrettyTable |
|
|
|
|
|
############### STATISTICS ################## |
|
|
|
def calc_statistical_results(data_dict): |
|
# +-----------------+ |
|
# | varcaller | |
|
# +-----------------+ |
|
# | T | F | |
|
# +-------+---+--------+--------+ |
|
# | Truth | T | hits | misses | |
|
# + - +---+--------+--------+ |
|
# | set | F | noise | | |
|
# +-------+---+--------+--------+ |
|
hit_count = len(data_dict.get("hits", [])) |
|
noise_count = len(data_dict.get("noise", [])) |
|
miss_count = len(data_dict.get("misses", [])) |
|
statistics = { |
|
"precision": hit_count / (hit_count + noise_count), |
|
"recall": hit_count / (hit_count + miss_count), |
|
} |
|
return statistics |
|
|
|
|
|
def compare_variant_lists(new_variants, truth_variant_set): |
|
|
|
new_variant_set = set(new_variants) |
|
new_variants_count = len(new_variant_set) |
|
print("created new set") |
|
|
|
hits = new_variant_set.intersection(truth_variant_set) |
|
misses = truth_variant_set.difference(new_variant_set) |
|
noise = new_variant_set.difference(truth_variant_set) |
|
new_duplicates = len(new_variants) - new_variants_count |
|
|
|
data_dict = { |
|
"new_variants": new_variant_set, |
|
"hits": hits, |
|
"misses": misses, |
|
"noise": noise, |
|
"duplicates": new_duplicates, |
|
} |
|
|
|
statistics = calc_statistical_results(data_dict) |
|
data_dict.update(statistics) |
|
return data_dict |
|
|
|
|
|
def percent(f): |
|
return "{:.4%}".format(f) |
|
|
|
|
|
def dict_to_report_row(varcaller_name, data_dict, varcaller_names): |
|
varcaller_dict = data_dict[varcaller_name] |
|
varcaller_misses = varcaller_dict['misses'] |
|
|
|
other_varcaller_hits = set() |
|
for other_varcaller in varcaller_names: |
|
if other_varcaller != varcaller_name: |
|
other_varcaller_hits |= data_dict[other_varcaller].get("hits") |
|
|
|
other_extra_hits = varcaller_misses.intersection(other_varcaller_hits) |
|
|
|
varcaller_row = [ |
|
varcaller_name, |
|
len(varcaller_dict['hits']), |
|
len(varcaller_misses), |
|
len(other_extra_hits), |
|
len(varcaller_dict['noise']), |
|
varcaller_dict['duplicates'], |
|
percent(varcaller_dict['precision']), |
|
percent(varcaller_dict['recall']), |
|
] |
|
|
|
return varcaller_row |
|
|
|
|
|
def print_report(data_dict, varcaller_names): |
|
if len(varcaller_names) == 1: |
|
varcaller_name = varcaller_names[0] |
|
varcaller_dict = data_dict[varcaller_name] |
|
print(f"calculated hits: found {len(varcaller_dict['hits'])} variants") |
|
print(f"calculated misses: found {len(varcaller_dict['misses'])} variants") |
|
print(f"calculated noise: found {len(varcaller_dict['noise'])} variants") |
|
print(f"calculated duplicates: found {varcaller_dict['duplicates']} variants") |
|
print(f"calculated duplicates: found {varcaller_dict['duplicates']} variants") |
|
print(f"precision: {varcaller_dict['precision']}") |
|
print(f"recall {varcaller_dict['recall']}") |
|
else: |
|
report = PrettyTable() |
|
report.field_names = [ |
|
"Varcaller", |
|
"hits", |
|
"misses", |
|
"found by others", |
|
"noise", |
|
"duplicate", |
|
"precision", |
|
"recall", |
|
] |
|
|
|
for varcaller_name in varcaller_names: |
|
varcaller_row = dict_to_report_row(varcaller_name, data_dict, varcaller_names) |
|
report.add_row(varcaller_row) |
|
|
|
print(report) |
|
|
|
|
|
################# VAR ID ################### |
|
|
|
|
|
def parts_to_var_id(chrom, pos, ref, alt, delim="_"): |
|
chrom_pos = (delim.join(map(str, [chrom, pos]))).upper() # we can't trust var_fields items to be str |
|
ref_alt = (delim.join(map(str, [ref, alt]))).upper() # we can't trust var_fields items to be str |
|
md5_ref_alt = hashlib.md5(ref_alt.encode()).hexdigest() |
|
md5_id = chrom_pos + delim + md5_ref_alt |
|
return md5_id |
|
|
|
|
|
def list_get(list, idx, default=""): |
|
try: |
|
return list[idx] |
|
except IndexError: |
|
return default |
|
|
|
|
|
def get_variant_id(var_fields): |
|
chrom = list_get(var_fields, 0, '') |
|
if chrom.lower().startswith("chr"): |
|
chrom = chrom[3:] |
|
|
|
pos = list_get(var_fields, 1, '') |
|
ref = list_get(var_fields, 3, '') |
|
alt = list_get(var_fields, 4, '') |
|
|
|
return parts_to_var_id(chrom, pos, ref, alt) |
|
|
|
|
|
############### BED HANDLING ################## |
|
|
|
def vcf_to_variants(vcf_file, bed_filter): |
|
variants = [] |
|
with read_file(vcf_file) as vcf_reader: |
|
for variant in vcf_reader: |
|
if variant.startswith('#'): |
|
continue |
|
var_fields = get_fields(variant) |
|
if bed_filter is None or bed_filter(var_fields): |
|
var_id = get_variant_id(var_fields) |
|
variants.append(var_id) |
|
# yield var_id |
|
return variants |
|
|
|
|
|
def parse_bed_line(line): |
|
chrom = list_get(line, 0, '') |
|
if chrom.lower().startswith("chr"): |
|
chrom = chrom[3:] |
|
|
|
start_pos = list_get(line, 1, '') |
|
end_pos = list_get(line, 3, '') |
|
return chrom, start_pos, end_pos |
|
|
|
|
|
def bed_tuple_to_filter(bed_tuple): |
|
return lambda v: bed_tuple[1] < v[1] < bed_tuple[2] if v[0]==bed_tuple[0] else False |
|
|
|
|
|
def bed_to_filter(bed_file): |
|
filters = [] |
|
with read_file(bed_file) as bed_reader: |
|
for line in bed_reader: |
|
if line.startswith('#'): |
|
continue |
|
line_as_tuple = parse_bed_line(line) |
|
filters.append(bed_tuple_to_filter(line_as_tuple)) |
|
return lambda var: any([fltr(var) for fltr in filters]) |
|
|
|
|
|
############### VCF HANDLING ################## |
|
|
|
@contextmanager |
|
def read_file(filename): |
|
""" |
|
:param filename: name of file to open. '-' for stdin/stdout |
|
:return: file descriptor |
|
""" |
|
if filename == '-': |
|
yield sys.stdin |
|
else: |
|
_open = gzip.open if filename.endswith('.gz') else open |
|
with _open(filename, 'rt', encoding='utf-8') as f: |
|
yield f |
|
|
|
|
|
def get_fields(line): |
|
if line.lstrip(" ").startswith("#"): |
|
return None |
|
fields = line.strip("\n").split('\t') |
|
# CHROM POS ID REF ALT QUAL FILTER INFO FORMAT sample1 |
|
if len(fields) < 5: |
|
raise Exception('Issue with line format.\n' + line + str(len(fields))) |
|
return fields |
|
|
|
|
|
def compare_vcfs(new_vcf, truth_vcf, current_vcf, bed_filter): |
|
truth_variants = vcf_to_variants(truth_vcf, bed_filter) |
|
print("loaded truth variants") |
|
new_variants = vcf_to_variants(new_vcf, bed_filter) |
|
print("loaded new variants") |
|
if current_vcf: |
|
current_variants = vcf_to_variants(current_vcf, bed_filter) |
|
print("loaded current variants") |
|
else: |
|
current_variants = [] |
|
|
|
truth_variant_set = set(truth_variants) |
|
print("created truth set") |
|
|
|
data_dict = {"truth_variants_": truth_variant_set, |
|
"new": compare_variant_lists(list(new_variants), truth_variant_set), |
|
} |
|
|
|
if current_vcf: |
|
data_dict["current"] = compare_variant_lists(list(current_variants), truth_variant_set) |
|
print_report(data_dict, ["new", "current"]) |
|
else: |
|
print_report(data_dict, ["new"]) |
|
|
|
|
|
### CLI ### |
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser( |
|
description="Take an varcaller vcf and absolute truth vcf as new, " |
|
"and produce statistics regurding varcaller vcf" |
|
) |
|
parser.add_argument('-i', '--varcaller_vcf', dest='varcaller_vcf', |
|
help='Input VCF(.gz) file from Varcaller', default=None) |
|
parser.add_argument('-c', '--current_varcaller_vcf', dest='current_vcf', |
|
help='Input VCF(.gz) file from a second Varcaller, to be compared with the first', default=None) |
|
parser.add_argument('-t', '--truth_vcf', dest='truth_vcf', |
|
help='VCF(.gz) file that represents absolute truth to be compared againts', default=None) |
|
parser.add_argument('-b', '--bam', dest='bam_file', help='the BAM file that was provided to the Varcaller') |
|
parser.add_argument('-r', '--regions-bed', dest='bed_file', help='BED file that was provided to the Varcaller') |
|
run_args = parser.parse_args() |
|
return run_args |
|
|
|
|
|
def main(varcaller_vcf, truth_vcf, current_vcf, bam_file, bed_file): |
|
bed_filter = bed_to_filter(bed_file) if bed_file else None |
|
compare_vcfs(varcaller_vcf, truth_vcf, current_vcf, bed_filter) |
|
|
|
|
|
if __name__ == '__main__': |
|
args = parse_args() |
|
if not args.varcaller_vcf: |
|
print("missing varcaller new file") |
|
elif not args.truth_vcf: |
|
print("missing truth file") |
|
else: |
|
main(args.varcaller_vcf, |
|
args.truth_vcf, |
|
args.current_vcf, |
|
args.bam_file, |
|
args.bed_file) |