-
-
Save soasme/1685e80a6f70311dbb801823ac8827fa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyparsing import Literal,CaselessLiteral,Word,Group,Optional,\ | |
ZeroOrMore,Forward,nums,alphas,alphanums,Regex,ParseException,\ | |
CaselessKeyword, Suppress | |
import math | |
import operator | |
def proc_align(window, algorithm, ts): | |
return ts | |
def proc_ts(v1, v2): | |
return 1 | |
opn = { | |
"+" : operator.add, | |
"-" : operator.sub, | |
"*" : operator.mul, | |
"/" : operator.truediv, | |
">": operator.lt, | |
">=": operator.le, | |
"<": operator.gt, | |
"<=": operator.ge, | |
"==": operator.eq, | |
"!=": operator.ne, | |
"^" : operator.pow | |
} | |
fn = { # key: (func, arg_number) | |
"int" : (int, 1), | |
"float" : (float, 1), | |
"sin" : (math.sin, 1), | |
"cos" : (math.cos, 1), | |
"tan" : (math.tan, 1), | |
"exp" : (math.exp, 1), | |
"abs" : (abs, 1), | |
"trunc" : (lambda a: int(a), 1), | |
"round" : (round, 1), | |
"align": (proc_align, 3), | |
"ts": (proc_ts, 2), | |
} | |
def eval_ts_query(query_string): | |
exprStack = [] | |
def push_to_stack(strg, loc, toks): | |
exprStack.append(toks[0]) | |
def evaluate_stack(s, ctx): | |
op = s.pop() | |
if op[0] == '"': | |
return op[1:-1] | |
if op in "+-*/^": | |
op2 = evaluate_stack( s, ctx ) | |
op1 = evaluate_stack( s, ctx) | |
return opn[op]( op1, op2 ) | |
elif op in {'<', '>', '>=', '<=', '==', '!='}: | |
op2 = evaluate_stack( s, ctx ) | |
op1 = evaluate_stack( s, ctx ) | |
return int(opn[op]( op1, op2 )) | |
elif op == 'not': | |
return int(not evaluate_stack(s, ctx)) | |
elif op == 'and': | |
op2 = evaluate_stack( s, ctx ) | |
op1 = evaluate_stack( s, ctx ) | |
return int(op1 and op2) | |
elif op == 'or': | |
op2 = evaluate_stack( s, ctx ) | |
op1 = evaluate_stack( s, ctx ) | |
return int(op1 or op2) | |
elif op in fn: | |
f, n = fn[op] | |
args = reversed([evaluate_stack(s, ctx) for _ in range(n)]) | |
return f(*args) | |
elif op[0].isalpha(): | |
if op in ctx: | |
return ctx[op] | |
else: | |
raise Exception('invalid identifier: %s' % op) | |
else: | |
return float(op) | |
point = Literal(".") | |
comma = Literal(",") | |
quote = Literal("\"") | |
plus, minus, mult, div = map(Literal, "+-*/") | |
lt, le, gt, ge, eq, ne = map(Literal, "< <= > >= == !=".split()) | |
fnumber = Regex(r"[+-]?\d+(?:\.\d*)?(?:[eE][+-]?\d+)?") | |
ident = Word(alphas, alphanums+"_$") | |
lpar, rpar = map(Suppress, "()") | |
addop = plus | minus | |
multop = mult | div | |
compop = lt | le | gt | ge | eq | ne | |
expop = Literal( "^" ) | |
fnumber = Regex(r"[+-]?\d+(?:\.\d*)?(?:[eE][+-]?\d+)?") | |
string = Regex(r"\"([^\\\"]*)\"") | |
ident = Word(alphas, alphanums+"_$") | |
expr = Forward() | |
atom = ( | |
( string | fnumber | ident + lpar + expr + ZeroOrMore(comma + expr) + rpar | ident ).setParseAction(push_to_stack) | |
| Group( lpar + expr + rpar ) | |
) | |
factor = Forward() | |
factor << atom + ZeroOrMore( ( expop + factor ).setParseAction( push_to_stack ) ) | |
term = factor + ZeroOrMore( ( multop + factor ).setParseAction( push_to_stack ) ) | |
factor_expr = term + ZeroOrMore( ( addop + term ).setParseAction( push_to_stack ) ) | |
comparison = factor_expr + ZeroOrMore( (compop + factor_expr).setParseAction( push_to_stack )) | |
not_test = Forward() | |
not_test << ((Literal("not") + not_test).setParseAction(push_to_stack) | comparison) | |
and_test = not_test + ZeroOrMore((Literal('and') + not_test).setParseAction(push_to_stack)) | |
or_test = and_test + ZeroOrMore((Literal('or') + and_test).setParseAction(push_to_stack)) | |
expr << or_test | |
bnf = expr | |
bnf.parseString(query_string, parseAll=True) | |
return evaluate_stack(exprStack, {}) | |
assert eval_ts_query("int(round(1.0 + 2.9))") == 4 | |
assert eval_ts_query("1 > 3") == 1 | |
assert eval_ts_query("not 1") == 0 | |
assert eval_ts_query("1 and 0") == 0 | |
assert eval_ts_query("2 and 1") == 1 | |
assert eval_ts_query("1 or 0") == 1 | |
assert eval_ts_query("0 or 0") == 0 | |
assert eval_ts_query("float(1)") == 1.0 | |
assert eval_ts_query('align(300, "max", ts("ping-health-v1.state", "dc=ash silo=prod"))') == 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment