Skip to content

Instantly share code, notes, and snippets.

@djlambert
Created March 17, 2018 18:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djlambert/dc909c4405df12c8a824121b2d4d713b to your computer and use it in GitHub Desktop.
Save djlambert/dc909c4405df12c8a824121b2d4d713b to your computer and use it in GitHub Desktop.
import string
import pyparsing as pp
from sqlalchemy.sql.operators import ColumnOperators
from typing import Text
from .models.base import Base
from .exception.filter_parser_exception import FilterParserException
class FilterParser(object):
@staticmethod
def parse(filter_statement: Text) -> ColumnOperators:
try:
return _filterStatement.parseString(filter_statement, parseAll=True)[0]
except pp.ParseException as exc:
raise FilterParserException(exc)
def _get_model(tokens: pp.ParseResults):
# noinspection PyUnresolvedReferences,PyProtectedMember
tokens.model = Base._decl_class_registry[tokens.model]
return tokens
def _get_model_field(tokens: pp.ParseResults):
tokens.modelField = getattr(tokens.modelField.model, tokens.modelField.fieldName)
return tokens
def _do_method(tokens: pp.ParseResults):
return getattr(tokens.modelField, tokens.method.methodName)(*tokens.method.arguments)
def _do_expression(tokens: pp.ParseResults):
methods = {
'==': '__eq__',
'LIKE': 'like',
'IS': 'is_',
'IS NOT': 'isnot'
}
try:
return getattr(tokens.modelField, methods[tokens.expression.operator])(tokens.expression.value)
except KeyError:
raise FilterParserException(f"Unknown operator '{tokens.expression.operator}'")
def _do_statement(tokens: pp.ParseResults):
if 'method' in tokens:
return _do_method(tokens)
elif 'expression' in tokens:
return _do_expression(tokens)
return tokens
def _binary_operation(tokens: pp.ParseResults):
methods = {
'AND': '__and__',
'OR': '__or__'
}
operation = tokens[0][1]
operands = tokens[0][0::2]
operand = operands.pop(0)
method = methods[operation]
for op in operands:
operand = getattr(operand, method)(op)
return operand
def _unary_operation(tokens: pp.ParseResults):
methods = {
'NOT': '__invert__'
}
operation, operand = tokens[0]
method = methods[operation]
return getattr(operand, method)()
_leftParenthesis = pp.Suppress('(')
_rightParenthesis = pp.Suppress(')')
_leftBracket = pp.Suppress('[')
_rightBracket = pp.Suppress(']')
_dot = pp.Suppress('.')
_stringValue = pp.quotedString.setParseAction(pp.removeQuotes)
_integerValue = pp.Combine(pp.Optional('-') + pp.Word(pp.nums)).setParseAction(lambda s, l, t: [int(t[0])])
_booleanValue = pp.CaselessKeyword('TRUE') | pp.CaselessKeyword('FALSE') | pp.CaselessKeyword('NULL')
_value = _stringValue | _integerValue | _booleanValue
_arguments = pp.delimitedList(_value).setResultsName('arguments')
_operatorAnd = pp.CaselessKeyword("AND")
_operatorOr = pp.CaselessKeyword("OR")
_operatorNot = pp.CaselessKeyword("NOT")
_operatorEquals = pp.Keyword('==')
_operatorNotEqual = pp.Keyword('!=')
_anyValueOperator = _operatorEquals | _operatorNotEqual
_operatorLike = pp.CaselessKeyword('LIKE')
_stringOperator = _operatorLike
_operatorIs = pp.CaselessKeyword('IS')
_operatorIsNot = pp.CaselessKeyword('IS NOT')
_booleanOperator = _operatorIsNot | _operatorIs
_modelName = pp.Word(string.ascii_uppercase, pp.alphas).setResultsName('model').setParseAction(_get_model)
_fieldName = pp.Word(pp.alphas + '_').setResultsName('fieldName')
_modelField = pp.Group(_modelName + _dot + _fieldName).setResultsName('modelField').setParseAction(_get_model_field)
_methodStartsWith = pp.CaselessKeyword('startswith').setResultsName('methodName')
_methodEndsWith = pp.CaselessKeyword('endswith').setResultsName('methodName')
_methodName = _methodStartsWith | _methodEndsWith
_method = pp.Group(_dot + _methodName + _leftParenthesis + _arguments + _rightParenthesis).setResultsName('method')
_anyExpression = _anyValueOperator.setResultsName('operator') + _value.setResultsName('value')
_stringExpression = _stringOperator.setResultsName('operator') + _stringValue.setResultsName('value')
_booleanExpression = _booleanOperator.setResultsName('operator') + _booleanValue.setResultsName('value')
_expression = pp.Group(_anyExpression | _stringExpression | _booleanExpression).setResultsName('expression')
_statement = (_modelField + (_method | _expression)).setParseAction(_do_statement)
_filterStatement = pp.infixNotation(_statement, [
(_operatorNot, 1, pp.opAssoc.RIGHT, _unary_operation),
(_operatorAnd, 2, pp.opAssoc.LEFT, _binary_operation),
(_operatorOr, 2, pp.opAssoc.LEFT, _binary_operation)
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment