Created
March 17, 2018 18:21
-
-
Save djlambert/dc909c4405df12c8a824121b2d4d713b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import string | |
import pyparsing as pp | |
from sqlalchemy.sql.operators import ColumnOperators | |
from typing import Text | |
from .models.base import Base | |
from .exception.filter_parser_exception import FilterParserException | |
class FilterParser(object): | |
@staticmethod | |
def parse(filter_statement: Text) -> ColumnOperators: | |
try: | |
return _filterStatement.parseString(filter_statement, parseAll=True)[0] | |
except pp.ParseException as exc: | |
raise FilterParserException(exc) | |
def _get_model(tokens: pp.ParseResults): | |
# noinspection PyUnresolvedReferences,PyProtectedMember | |
tokens.model = Base._decl_class_registry[tokens.model] | |
return tokens | |
def _get_model_field(tokens: pp.ParseResults): | |
tokens.modelField = getattr(tokens.modelField.model, tokens.modelField.fieldName) | |
return tokens | |
def _do_method(tokens: pp.ParseResults): | |
return getattr(tokens.modelField, tokens.method.methodName)(*tokens.method.arguments) | |
def _do_expression(tokens: pp.ParseResults): | |
methods = { | |
'==': '__eq__', | |
'LIKE': 'like', | |
'IS': 'is_', | |
'IS NOT': 'isnot' | |
} | |
try: | |
return getattr(tokens.modelField, methods[tokens.expression.operator])(tokens.expression.value) | |
except KeyError: | |
raise FilterParserException(f"Unknown operator '{tokens.expression.operator}'") | |
def _do_statement(tokens: pp.ParseResults): | |
if 'method' in tokens: | |
return _do_method(tokens) | |
elif 'expression' in tokens: | |
return _do_expression(tokens) | |
return tokens | |
def _binary_operation(tokens: pp.ParseResults): | |
methods = { | |
'AND': '__and__', | |
'OR': '__or__' | |
} | |
operation = tokens[0][1] | |
operands = tokens[0][0::2] | |
operand = operands.pop(0) | |
method = methods[operation] | |
for op in operands: | |
operand = getattr(operand, method)(op) | |
return operand | |
def _unary_operation(tokens: pp.ParseResults): | |
methods = { | |
'NOT': '__invert__' | |
} | |
operation, operand = tokens[0] | |
method = methods[operation] | |
return getattr(operand, method)() | |
_leftParenthesis = pp.Suppress('(') | |
_rightParenthesis = pp.Suppress(')') | |
_leftBracket = pp.Suppress('[') | |
_rightBracket = pp.Suppress(']') | |
_dot = pp.Suppress('.') | |
_stringValue = pp.quotedString.setParseAction(pp.removeQuotes) | |
_integerValue = pp.Combine(pp.Optional('-') + pp.Word(pp.nums)).setParseAction(lambda s, l, t: [int(t[0])]) | |
_booleanValue = pp.CaselessKeyword('TRUE') | pp.CaselessKeyword('FALSE') | pp.CaselessKeyword('NULL') | |
_value = _stringValue | _integerValue | _booleanValue | |
_arguments = pp.delimitedList(_value).setResultsName('arguments') | |
_operatorAnd = pp.CaselessKeyword("AND") | |
_operatorOr = pp.CaselessKeyword("OR") | |
_operatorNot = pp.CaselessKeyword("NOT") | |
_operatorEquals = pp.Keyword('==') | |
_operatorNotEqual = pp.Keyword('!=') | |
_anyValueOperator = _operatorEquals | _operatorNotEqual | |
_operatorLike = pp.CaselessKeyword('LIKE') | |
_stringOperator = _operatorLike | |
_operatorIs = pp.CaselessKeyword('IS') | |
_operatorIsNot = pp.CaselessKeyword('IS NOT') | |
_booleanOperator = _operatorIsNot | _operatorIs | |
_modelName = pp.Word(string.ascii_uppercase, pp.alphas).setResultsName('model').setParseAction(_get_model) | |
_fieldName = pp.Word(pp.alphas + '_').setResultsName('fieldName') | |
_modelField = pp.Group(_modelName + _dot + _fieldName).setResultsName('modelField').setParseAction(_get_model_field) | |
_methodStartsWith = pp.CaselessKeyword('startswith').setResultsName('methodName') | |
_methodEndsWith = pp.CaselessKeyword('endswith').setResultsName('methodName') | |
_methodName = _methodStartsWith | _methodEndsWith | |
_method = pp.Group(_dot + _methodName + _leftParenthesis + _arguments + _rightParenthesis).setResultsName('method') | |
_anyExpression = _anyValueOperator.setResultsName('operator') + _value.setResultsName('value') | |
_stringExpression = _stringOperator.setResultsName('operator') + _stringValue.setResultsName('value') | |
_booleanExpression = _booleanOperator.setResultsName('operator') + _booleanValue.setResultsName('value') | |
_expression = pp.Group(_anyExpression | _stringExpression | _booleanExpression).setResultsName('expression') | |
_statement = (_modelField + (_method | _expression)).setParseAction(_do_statement) | |
_filterStatement = pp.infixNotation(_statement, [ | |
(_operatorNot, 1, pp.opAssoc.RIGHT, _unary_operation), | |
(_operatorAnd, 2, pp.opAssoc.LEFT, _binary_operation), | |
(_operatorOr, 2, pp.opAssoc.LEFT, _binary_operation) | |
]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment