Skip to content

Instantly share code, notes, and snippets.

@jamescasbon
Created February 15, 2013 12:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jamescasbon/4960150 to your computer and use it in GitHub Desktop.
Save jamescasbon/4960150 to your computer and use it in GitHub Desktop.
pyvcf proposal to use funcparserlib to clarify parsing
import sys
import os
import re
import logging
import collections
import pprint
from funcparserlib.lexer import make_tokenizer, Spec
from funcparserlib.parser import (maybe, many, eof, skip, fwd, name_parser_vars, SyntaxError)
from funcparserlib.contrib.common import const, n, op, op_, sometok
# Object specification
# --------------------
class _HeaderEntry(object):
""" Base class for header entries """
expected = []
def __init__(self, values=None, strict=True, name=None):
self._name = name
self.values = values
if strict and hasattr(values, 'keys'):
missing = set(self.expected) - set(values.keys())
extra = set(values.keys()) - set(self.expected)
if missing:
logging.warning('missing definitions of %s for a %s header entry', missing, self.name)
if extra:
logging.warning('extra definitions %s for a %s header entry', extra, self.name)
@property
def name(self):
if self._name is not None:
return self._name
return self.__class__.__name__
def __repr__(self):
return "HeaderEntry(type=%s, values=%s)" % (self.name, self.values)
def __getattr__(self, name):
try:
return self.values[name]
except KeyError:
raise AttributeError
class _HeaderFormatSpec(_HeaderEntry):
""" Base class for header entries used to cast values later on in the file """
expected = ['ID', 'Number', 'Type', 'Description']
def cast(self, string):
if string == '.':
return None
try:
return self.Type(string)
except ValueError:
if type == int:
try:
value = float(string)
logging.warning('int type is actually a float')
return value
except:
pass
logging.warning('unable to parse value %s as type %s for %s', string, self.Type, self.name)
class Header(list):
""" A collection of header entries """
class FORMAT(_HeaderFormatSpec):
pass
class INFO(_HeaderFormatSpec):
pass
class FILTER(_HeaderEntry):
expected = ['ID', 'Description']
class contig(_HeaderEntry):
expected = ['assembly', 'taxonomy', 'species', 'length', 'ID', 'md5']
_classes = {
'INFO': INFO,
'FORMAT': FORMAT,
'FILTER': FILTER,
'contig': contig
}
def __init__(self, *entries):
list.__init__(self, *entries)
self.version = None
self.infos = {}
self.filters = {}
self.formats = {}
self.contigs = {}
self._check_version()
self._create_lookups()
def _check_version(self):
if len(self) > 0 and self[0].name == 'fileformat':
self.version = self[0].values
else:
logging.warning('header is missing fileformat specification')
def _create_lookups(self):
dicts = {
'INFO': self.infos,
'FILTER': self.filters,
'FORMAT': self.formats,
'contig': self.contigs
}
for entry in self:
dict_ = dicts.get(entry.name)
if dict_ is not None:
dict_[entry.ID] = entry
@classmethod
def _make_entry(cls, a):
""" Look up the class for a header line """
key, vals = a[0], a[1]
try:
return cls._classes[key](vals)
except KeyError:
return _HeaderEntry(name=key, values=vals)
@classmethod
def _make_value(cls, (k, v)):
""" handle typing of key, value pairs """
if k == 'Type':
if v == 'Integer': v = int
if v == 'String': v = str
if v == 'Float': v = float
return (k,v)
if k in ['Number', 'length'] :
try:
return k, int(v)
except ValueError:
return k, v
return k, v
@classmethod
def _make_ordered_dict(cls, el):
""" flatten a (head, list) pair into a list """
l = [el[0]]
if len(el) > 1:
l = l + el[1]
return collections.OrderedDict(l)
# Lexer specification
# -------------------
specs = [
Spec('description', r'"([^"]*)"'),
Spec('op', r'[#<>=,\n]'),
Spec('number', r'(\d+)'),
Spec('name', r'[A-Za-z_0-9\.:\/-]+'),
]
def tokenize(s):
return list(make_tokenizer(specs)(s))
# Grammar specification
# ---------------------
# a header value can be a name, a number or a quoted string
value = (sometok('name') | sometok('number') | sometok('description'))
# key value pairs are separated with '='
kv = sometok('name') + op_('=') + value >> Header._make_value
# a list of kvs are comma separated
kvlist = kv + many(op_(',') + kv) >> Header._make_ordered_dict
# header lines are name = (value | <kvlist>)
header_line = op_('#') + op_('#') + sometok('name') + op_('=') + (value | op_('<') + kvlist + op_('>')) >> Header._make_entry
header = many(header_line + op_('\n')) >> Header
exf = """##fileformat=VCFv4.1
##fileDate=20090805
##source=myImputationProgramV3.1
##reference=file:///seq/references/1000GenomesPilot-NCBI36.fasta
##contig=<ID=20,length=62435964,assembly=B36,md5=f126cdf8a6e0c7f379d618ff66beb2da,species="Homo sapiens",taxonomy=x>
##phasing=partial
##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of Samples With Data">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">
##INFO=<ID=AF,Number=A,Type=Float,Description="Allele Frequency">
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FILTER=<ID=q10,Description="Quality below 10">
##FILTER=<ID=s50,Description="Less than 50% of samples have data">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">"""
if __name__ == '__main__':
pprint.pprint(header.parse(tokenize(exf)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment