Last active
October 28, 2016 08:09
-
-
Save lozybean/464d8f5267bb30c1c1dc1e5532b2a29d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- \# | |
""" | |
@author = 'liangzb' | |
@date = '2016/10/28 0028' | |
This script is used for FILTER marker in .vcf files | |
1. if filterName or genotypeFilterName is startswith 'SNP' | |
or 'INDEL', only corresponding record will be check. | |
2. filterExpression and genotypeFilterExpression is a simple | |
python expression. | |
3. filterName and filterExpression is one correspondence, | |
genotypeFilterName and genotypeFilterExpression is one | |
correspondence. | |
""" | |
import argparse | |
from ast import * | |
from functools import lru_cache | |
from vcf import VCFReader, VCFWriter | |
from vcf.parser import _Filter | |
class NInf(object): | |
def __lt__(self, other): | |
return True | |
def __le__(self, other): | |
return True | |
def __gt__(self, other): | |
return False | |
def __ge__(self, other): | |
return False | |
def __eq__(self, other): | |
if isinstance(other, NInf): | |
return True | |
else: | |
return False | |
def __neg__(self): | |
return Inf() | |
class Inf(object): | |
def __lt__(self, other): | |
return False | |
def __le__(self, other): | |
return False | |
def __gt__(self, other): | |
return True | |
def __ge__(self, other): | |
return True | |
def __eq__(self, other): | |
if isinstance(other, Inf): | |
return True | |
else: | |
return False | |
def __neg__(self): | |
return NInf() | |
class FilterMarker(object): | |
inf = Inf() | |
def __init__(self): | |
self.args = self.read_params() | |
self.check_filter_expressions() | |
@staticmethod | |
def read_params(): | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument('-i', '--vcf_in', dest='vcf_in', | |
metavar='FILE', type=str, required=True, | |
help="input vcf file") | |
parser.add_argument('-o', '--vcf_out', dest='vcf_out', | |
metavar='FILE', type=str, required=True, | |
help="output vcf file") | |
parser.add_argument('--filterName', nargs='+', dest='filterName', | |
metavar='STR', type=str, default=[], action='append', | |
help="set the filter names") | |
parser.add_argument('--filterExpression', nargs='+', dest='filterExpression', | |
metavar='EXPRESSION', type=str, default=[], action='append', | |
help="set the filter expressions") | |
parser.add_argument('--genotypeFilterName', nargs='+', dest='genotypeFilterName', | |
metavar='STR', type=str, default=[], action='append', | |
help="set the genotype filter names") | |
parser.add_argument('--genotypeFilterExpression', nargs='+', dest='genotypeFilterExpression', | |
metavar='EXPRESSION', type=str, default=[], action='append', | |
help="set the genotype filter expressions") | |
parser.add_argument('--inf_defaults', nargs='+', dest='inf_defaults', | |
metavar='STR', type=str, default=[], | |
help="set defaults items in INFO be +inf") | |
parser.add_argument('--neg_inf_defaults', nargs='+', dest='neg_inf_defaults', | |
metavar='STR', type=str, default=[], | |
help="set defaults items in INFO be -inf") | |
args = parser.parse_args() | |
args.filterName = [filterName for sublist in args.filterName | |
for filterName in sublist] | |
args.filterExpression = [filterExpression for sublist in args.filterExpression | |
for filterExpression in sublist] | |
if len(args.filterName) != len(args.filterExpression): | |
raise ValueError("filterName and filterExpression must be" | |
"one correspondence") | |
if len(args.genotypeFilterName) != len(args.genotypeFilterExpression): | |
raise ValueError("genotypeFilterName and genotypeFilterExpression" | |
"must be one correspondence") | |
return args | |
@property | |
@lru_cache(1) | |
def filters(self): | |
filters = {k: v for k, v in zip(self.args.filterName, | |
self.args.filterExpression)} | |
return filters | |
@property | |
@lru_cache(1) | |
def genotype_filters(self): | |
filters = {k: v for k, v in zip(self.args.genotypeFilterName, | |
self.args.genotypeFilterExpression)} | |
return filters | |
@property | |
@lru_cache(1) | |
def user_defaults(self): | |
defaults = {} | |
for p_default in self.args.inf_defaults: | |
defaults.update({p_default: self.inf}) | |
for n_default in self.args.neg_inf_defaults: | |
defaults.update({n_default: -self.inf}) | |
return defaults | |
def check_filter_expressions(self): | |
for f in self.filters.values(): | |
self.check_eval(f) | |
for f in self.genotype_filters.values(): | |
self.check_eval(f) | |
def add_filter_to_metainfo(self, template): | |
for k, v in self.filters.items(): | |
template.filters.update({ | |
k: _Filter(id=k, desc=v) | |
}) | |
for k, v in self.genotype_filters.items(): | |
template.filters.update({ | |
k: _Filter(id=k, desc=v) | |
}) | |
return template | |
def check_eval(self, node_or_string): | |
if isinstance(node_or_string, str): | |
node_or_string = compile(node_or_string, '', 'eval', PyCF_ONLY_AST) | |
if isinstance(node_or_string, Expression): | |
node_or_string = node_or_string.body | |
if isinstance(node_or_string, BoolOp): | |
for value in node_or_string.values: | |
self.check_eval(value) | |
elif isinstance(node_or_string, Compare): | |
self.check_eval(node_or_string.left) | |
for comparator in node_or_string.comparators: | |
self.check_eval(comparator) | |
elif isinstance(node_or_string, UnaryOp): | |
self.check_eval(node_or_string.operand) | |
elif isinstance(node_or_string, BinOp): | |
self.check_eval(node_or_string.left) | |
self.check_eval(node_or_string.right) | |
elif isinstance(node_or_string, Subscript): | |
self.check_eval(node_or_string.value) | |
elif isinstance(node_or_string, (Name, Num)): | |
# only Name or Num base element is allowed | |
pass | |
else: | |
raise ValueError("unsupported expression!") | |
return None | |
def __call__(self): | |
with open(self.args.vcf_in) as fp_in, open(self.args.vcf_out, 'w') as fp_out: | |
vcf_reader = VCFReader(fp_in) | |
template = self.add_filter_to_metainfo(vcf_reader) | |
vcf_writer = VCFWriter(fp_out, template) | |
for record in vcf_reader: | |
default_values = { | |
'QUAL': record.QUAL, | |
'DP': self.inf, | |
'MQ': self.inf, | |
'QD': self.inf, | |
'FS': -self.inf, | |
'SOR': -self.inf, | |
'MQRankSum': self.inf, | |
'ReadPosRankSum': self.inf, | |
'InbreedingCoeff': self.inf, | |
} | |
default_values.update(self.user_defaults) | |
# print(default_values) | |
if record.FILTER is None: | |
record.FILTER = [] | |
for k, v in self.filters.items(): | |
if k.startswith('SNP') and not record.is_snp: | |
continue | |
if k.startswith('INDEL') and not record.is_indel: | |
continue | |
judgement = eval(v, default_values, record.INFO) | |
if judgement: | |
record.FILTER.append(k) | |
for k, v in self.genotype_filters.items(): | |
judgement = eval(v, {}, record.samples[0].data.__dict__) | |
if judgement: | |
record.FILTER.append(k) | |
vcf_writer.write_record(record) | |
if __name__ == '__main__': | |
marker = FilterMarker() | |
marker() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment