Skip to content

Instantly share code, notes, and snippets.

@lozybean
Last active October 28, 2016 08:09
Show Gist options
  • Save lozybean/464d8f5267bb30c1c1dc1e5532b2a29d to your computer and use it in GitHub Desktop.
Save lozybean/464d8f5267bb30c1c1dc1e5532b2a29d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*- \#
"""
@author = 'liangzb'
@date = '2016/10/28 0028'
This script is used for FILTER marker in .vcf files
1. if filterName or genotypeFilterName is startswith 'SNP'
or 'INDEL', only corresponding record will be check.
2. filterExpression and genotypeFilterExpression is a simple
python expression.
3. filterName and filterExpression is one correspondence,
genotypeFilterName and genotypeFilterExpression is one
correspondence.
"""
import argparse
from ast import *
from functools import lru_cache
from vcf import VCFReader, VCFWriter
from vcf.parser import _Filter
class NInf(object):
def __lt__(self, other):
return True
def __le__(self, other):
return True
def __gt__(self, other):
return False
def __ge__(self, other):
return False
def __eq__(self, other):
if isinstance(other, NInf):
return True
else:
return False
def __neg__(self):
return Inf()
class Inf(object):
def __lt__(self, other):
return False
def __le__(self, other):
return False
def __gt__(self, other):
return True
def __ge__(self, other):
return True
def __eq__(self, other):
if isinstance(other, Inf):
return True
else:
return False
def __neg__(self):
return NInf()
class FilterMarker(object):
inf = Inf()
def __init__(self):
self.args = self.read_params()
self.check_filter_expressions()
@staticmethod
def read_params():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-i', '--vcf_in', dest='vcf_in',
metavar='FILE', type=str, required=True,
help="input vcf file")
parser.add_argument('-o', '--vcf_out', dest='vcf_out',
metavar='FILE', type=str, required=True,
help="output vcf file")
parser.add_argument('--filterName', nargs='+', dest='filterName',
metavar='STR', type=str, default=[], action='append',
help="set the filter names")
parser.add_argument('--filterExpression', nargs='+', dest='filterExpression',
metavar='EXPRESSION', type=str, default=[], action='append',
help="set the filter expressions")
parser.add_argument('--genotypeFilterName', nargs='+', dest='genotypeFilterName',
metavar='STR', type=str, default=[], action='append',
help="set the genotype filter names")
parser.add_argument('--genotypeFilterExpression', nargs='+', dest='genotypeFilterExpression',
metavar='EXPRESSION', type=str, default=[], action='append',
help="set the genotype filter expressions")
parser.add_argument('--inf_defaults', nargs='+', dest='inf_defaults',
metavar='STR', type=str, default=[],
help="set defaults items in INFO be +inf")
parser.add_argument('--neg_inf_defaults', nargs='+', dest='neg_inf_defaults',
metavar='STR', type=str, default=[],
help="set defaults items in INFO be -inf")
args = parser.parse_args()
args.filterName = [filterName for sublist in args.filterName
for filterName in sublist]
args.filterExpression = [filterExpression for sublist in args.filterExpression
for filterExpression in sublist]
if len(args.filterName) != len(args.filterExpression):
raise ValueError("filterName and filterExpression must be"
"one correspondence")
if len(args.genotypeFilterName) != len(args.genotypeFilterExpression):
raise ValueError("genotypeFilterName and genotypeFilterExpression"
"must be one correspondence")
return args
@property
@lru_cache(1)
def filters(self):
filters = {k: v for k, v in zip(self.args.filterName,
self.args.filterExpression)}
return filters
@property
@lru_cache(1)
def genotype_filters(self):
filters = {k: v for k, v in zip(self.args.genotypeFilterName,
self.args.genotypeFilterExpression)}
return filters
@property
@lru_cache(1)
def user_defaults(self):
defaults = {}
for p_default in self.args.inf_defaults:
defaults.update({p_default: self.inf})
for n_default in self.args.neg_inf_defaults:
defaults.update({n_default: -self.inf})
return defaults
def check_filter_expressions(self):
for f in self.filters.values():
self.check_eval(f)
for f in self.genotype_filters.values():
self.check_eval(f)
def add_filter_to_metainfo(self, template):
for k, v in self.filters.items():
template.filters.update({
k: _Filter(id=k, desc=v)
})
for k, v in self.genotype_filters.items():
template.filters.update({
k: _Filter(id=k, desc=v)
})
return template
def check_eval(self, node_or_string):
if isinstance(node_or_string, str):
node_or_string = compile(node_or_string, '', 'eval', PyCF_ONLY_AST)
if isinstance(node_or_string, Expression):
node_or_string = node_or_string.body
if isinstance(node_or_string, BoolOp):
for value in node_or_string.values:
self.check_eval(value)
elif isinstance(node_or_string, Compare):
self.check_eval(node_or_string.left)
for comparator in node_or_string.comparators:
self.check_eval(comparator)
elif isinstance(node_or_string, UnaryOp):
self.check_eval(node_or_string.operand)
elif isinstance(node_or_string, BinOp):
self.check_eval(node_or_string.left)
self.check_eval(node_or_string.right)
elif isinstance(node_or_string, Subscript):
self.check_eval(node_or_string.value)
elif isinstance(node_or_string, (Name, Num)):
# only Name or Num base element is allowed
pass
else:
raise ValueError("unsupported expression!")
return None
def __call__(self):
with open(self.args.vcf_in) as fp_in, open(self.args.vcf_out, 'w') as fp_out:
vcf_reader = VCFReader(fp_in)
template = self.add_filter_to_metainfo(vcf_reader)
vcf_writer = VCFWriter(fp_out, template)
for record in vcf_reader:
default_values = {
'QUAL': record.QUAL,
'DP': self.inf,
'MQ': self.inf,
'QD': self.inf,
'FS': -self.inf,
'SOR': -self.inf,
'MQRankSum': self.inf,
'ReadPosRankSum': self.inf,
'InbreedingCoeff': self.inf,
}
default_values.update(self.user_defaults)
# print(default_values)
if record.FILTER is None:
record.FILTER = []
for k, v in self.filters.items():
if k.startswith('SNP') and not record.is_snp:
continue
if k.startswith('INDEL') and not record.is_indel:
continue
judgement = eval(v, default_values, record.INFO)
if judgement:
record.FILTER.append(k)
for k, v in self.genotype_filters.items():
judgement = eval(v, {}, record.samples[0].data.__dict__)
if judgement:
record.FILTER.append(k)
vcf_writer.write_record(record)
if __name__ == '__main__':
marker = FilterMarker()
marker()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment