Skip to content

Instantly share code, notes, and snippets.

@soasme
Last active December 7, 2018 02:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soasme/1685e80a6f70311dbb801823ac8827fa to your computer and use it in GitHub Desktop.
Save soasme/1685e80a6f70311dbb801823ac8827fa to your computer and use it in GitHub Desktop.
from pyparsing import Literal,CaselessLiteral,Word,Group,Optional,\
ZeroOrMore,Forward,nums,alphas,alphanums,Regex,ParseException,\
CaselessKeyword, Suppress
import math
import operator
def proc_align(window, algorithm, ts):
return ts
def proc_ts(v1, v2):
return 1
opn = {
"+" : operator.add,
"-" : operator.sub,
"*" : operator.mul,
"/" : operator.truediv,
">": operator.lt,
">=": operator.le,
"<": operator.gt,
"<=": operator.ge,
"==": operator.eq,
"!=": operator.ne,
"^" : operator.pow
}
fn = { # key: (func, arg_number)
"int" : (int, 1),
"float" : (float, 1),
"sin" : (math.sin, 1),
"cos" : (math.cos, 1),
"tan" : (math.tan, 1),
"exp" : (math.exp, 1),
"abs" : (abs, 1),
"trunc" : (lambda a: int(a), 1),
"round" : (round, 1),
"align": (proc_align, 3),
"ts": (proc_ts, 2),
}
def eval_ts_query(query_string):
exprStack = []
def push_to_stack(strg, loc, toks):
exprStack.append(toks[0])
def evaluate_stack(s, ctx):
op = s.pop()
if op[0] == '"':
return op[1:-1]
if op in "+-*/^":
op2 = evaluate_stack( s, ctx )
op1 = evaluate_stack( s, ctx)
return opn[op]( op1, op2 )
elif op in {'<', '>', '>=', '<=', '==', '!='}:
op2 = evaluate_stack( s, ctx )
op1 = evaluate_stack( s, ctx )
return int(opn[op]( op1, op2 ))
elif op == 'not':
return int(not evaluate_stack(s, ctx))
elif op == 'and':
op2 = evaluate_stack( s, ctx )
op1 = evaluate_stack( s, ctx )
return int(op1 and op2)
elif op == 'or':
op2 = evaluate_stack( s, ctx )
op1 = evaluate_stack( s, ctx )
return int(op1 or op2)
elif op in fn:
f, n = fn[op]
args = reversed([evaluate_stack(s, ctx) for _ in range(n)])
return f(*args)
elif op[0].isalpha():
if op in ctx:
return ctx[op]
else:
raise Exception('invalid identifier: %s' % op)
else:
return float(op)
point = Literal(".")
comma = Literal(",")
quote = Literal("\"")
plus, minus, mult, div = map(Literal, "+-*/")
lt, le, gt, ge, eq, ne = map(Literal, "< <= > >= == !=".split())
fnumber = Regex(r"[+-]?\d+(?:\.\d*)?(?:[eE][+-]?\d+)?")
ident = Word(alphas, alphanums+"_$")
lpar, rpar = map(Suppress, "()")
addop = plus | minus
multop = mult | div
compop = lt | le | gt | ge | eq | ne
expop = Literal( "^" )
fnumber = Regex(r"[+-]?\d+(?:\.\d*)?(?:[eE][+-]?\d+)?")
string = Regex(r"\"([^\\\"]*)\"")
ident = Word(alphas, alphanums+"_$")
expr = Forward()
atom = (
( string | fnumber | ident + lpar + expr + ZeroOrMore(comma + expr) + rpar | ident ).setParseAction(push_to_stack)
| Group( lpar + expr + rpar )
)
factor = Forward()
factor << atom + ZeroOrMore( ( expop + factor ).setParseAction( push_to_stack ) )
term = factor + ZeroOrMore( ( multop + factor ).setParseAction( push_to_stack ) )
factor_expr = term + ZeroOrMore( ( addop + term ).setParseAction( push_to_stack ) )
comparison = factor_expr + ZeroOrMore( (compop + factor_expr).setParseAction( push_to_stack ))
not_test = Forward()
not_test << ((Literal("not") + not_test).setParseAction(push_to_stack) | comparison)
and_test = not_test + ZeroOrMore((Literal('and') + not_test).setParseAction(push_to_stack))
or_test = and_test + ZeroOrMore((Literal('or') + and_test).setParseAction(push_to_stack))
expr << or_test
bnf = expr
bnf.parseString(query_string, parseAll=True)
return evaluate_stack(exprStack, {})
assert eval_ts_query("int(round(1.0 + 2.9))") == 4
assert eval_ts_query("1 > 3") == 1
assert eval_ts_query("not 1") == 0
assert eval_ts_query("1 and 0") == 0
assert eval_ts_query("2 and 1") == 1
assert eval_ts_query("1 or 0") == 1
assert eval_ts_query("0 or 0") == 0
assert eval_ts_query("float(1)") == 1.0
assert eval_ts_query('align(300, "max", ts("ping-health-v1.state", "dc=ash silo=prod"))') == 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment