Skip to content

@jamescasbon /funcparserlib_vcf.py
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
pyvcf proposal to use funcparserlib to clarify parsing
import sys
import os
import re
import logging
import collections
import pprint
from funcparserlib.lexer import make_tokenizer, Spec
from funcparserlib.parser import (maybe, many, eof, skip, fwd, name_parser_vars, SyntaxError)
from funcparserlib.contrib.common import const, n, op, op_, sometok
# Object specification
# --------------------
class _HeaderEntry(object):
""" Base class for header entries """
expected = []
def __init__(self, values=None, strict=True, name=None):
self._name = name
self.values = values
if strict and hasattr(values, 'keys'):
missing = set(self.expected) - set(values.keys())
extra = set(values.keys()) - set(self.expected)
if missing:
logging.warning('missing definitions of %s for a %s header entry', missing, self.name)
if extra:
logging.warning('extra definitions %s for a %s header entry', extra, self.name)
@property
def name(self):
if self._name is not None:
return self._name
return self.__class__.__name__
def __repr__(self):
return "HeaderEntry(type=%s, values=%s)" % (self.name, self.values)
def __getattr__(self, name):
try:
return self.values[name]
except KeyError:
raise AttributeError
class _HeaderFormatSpec(_HeaderEntry):
""" Base class for header entries used to cast values later on in the file """
expected = ['ID', 'Number', 'Type', 'Description']
def cast(self, string):
if string == '.':
return None
try:
return self.Type(string)
except ValueError:
if type == int:
try:
value = float(string)
logging.warning('int type is actually a float')
return value
except:
pass
logging.warning('unable to parse value %s as type %s for %s', string, self.Type, self.name)
class Header(list):
""" A collection of header entries """
class FORMAT(_HeaderFormatSpec):
pass
class INFO(_HeaderFormatSpec):
pass
class FILTER(_HeaderEntry):
expected = ['ID', 'Description']
class contig(_HeaderEntry):
expected = ['assembly', 'taxonomy', 'species', 'length', 'ID', 'md5']
_classes = {
'INFO': INFO,
'FORMAT': FORMAT,
'FILTER': FILTER,
'contig': contig
}
def __init__(self, *entries):
list.__init__(self, *entries)
self.version = None
self.infos = {}
self.filters = {}
self.formats = {}
self.contigs = {}
self._check_version()
self._create_lookups()
def _check_version(self):
if len(self) > 0 and self[0].name == 'fileformat':
self.version = self[0].values
else:
logging.warning('header is missing fileformat specification')
def _create_lookups(self):
dicts = {
'INFO': self.infos,
'FILTER': self.filters,
'FORMAT': self.formats,
'contig': self.contigs
}
for entry in self:
dict_ = dicts.get(entry.name)
if dict_ is not None:
dict_[entry.ID] = entry
@classmethod
def _make_entry(cls, a):
""" Look up the class for a header line """
key, vals = a[0], a[1]
try:
return cls._classes[key](vals)
except KeyError:
return _HeaderEntry(name=key, values=vals)
@classmethod
def _make_value(cls, (k, v)):
""" handle typing of key, value pairs """
if k == 'Type':
if v == 'Integer': v = int
if v == 'String': v = str
if v == 'Float': v = float
return (k,v)
if k in ['Number', 'length'] :
try:
return k, int(v)
except ValueError:
return k, v
return k, v
@classmethod
def _make_ordered_dict(cls, el):
""" flatten a (head, list) pair into a list """
l = [el[0]]
if len(el) > 1:
l = l + el[1]
return collections.OrderedDict(l)
# Lexer specification
# -------------------
specs = [
Spec('description', r'"([^"]*)"'),
Spec('op', r'[#<>=,\n]'),
Spec('number', r'(\d+)'),
Spec('name', r'[A-Za-z_0-9\.:\/-]+'),
]
def tokenize(s):
return list(make_tokenizer(specs)(s))
# Grammar specification
# ---------------------
# a header value can be a name, a number or a quoted string
value = (sometok('name') | sometok('number') | sometok('description'))
# key value pairs are separated with '='
kv = sometok('name') + op_('=') + value >> Header._make_value
# a list of kvs are comma separated
kvlist = kv + many(op_(',') + kv) >> Header._make_ordered_dict
# header lines are name = (value | <kvlist>)
header_line = op_('#') + op_('#') + sometok('name') + op_('=') + (value | op_('<') + kvlist + op_('>')) >> Header._make_entry
header = many(header_line + op_('\n')) >> Header
exf = """##fileformat=VCFv4.1
##fileDate=20090805
##source=myImputationProgramV3.1
##reference=file:///seq/references/1000GenomesPilot-NCBI36.fasta
##contig=<ID=20,length=62435964,assembly=B36,md5=f126cdf8a6e0c7f379d618ff66beb2da,species="Homo sapiens",taxonomy=x>
##phasing=partial
##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of Samples With Data">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">
##INFO=<ID=AF,Number=A,Type=Float,Description="Allele Frequency">
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FILTER=<ID=q10,Description="Quality below 10">
##FILTER=<ID=s50,Description="Less than 50% of samples have data">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">"""
if __name__ == '__main__':
pprint.pprint(header.parse(tokenize(exf)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.