Created
July 24, 2019 21:57
-
-
Save mathewmoon/737ad52e4ef158daac59b3f00c3fd52b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
import sys | |
import re | |
import base64 | |
import sys | |
from jsonpath_ng import jsonpath, parse | |
from sly import Lexer, Parser | |
class DynamoLex(Lexer): | |
tokens = [ | |
'EQ', | |
'NE', | |
'LT', | |
'LT_OR_EQ', | |
'GT', | |
'GT_OR_EQ', | |
'IN', | |
'BETWEEN', | |
'EXISTS', | |
'NOT_EXISTS', | |
'BEGINS_WITH', | |
'L_PARENTH', | |
'R_PARENTH', | |
'NOT', | |
'AND', | |
'OR', | |
'STRING', | |
'PATH', | |
'INT', | |
'FLOAT', | |
'SIZE' | |
] | |
message = {} | |
EQ = r'=' | |
NE = r'<>' | |
LT = r'<' | |
LT_OR_EQ = r'<=' | |
GT = r'>' | |
GT_OR_EQ = r'>=' | |
IN = r'IN' | |
BETWEEN = r'BETWEEN' | |
EXISTS = r'^attribute_exists\(.+\)$' | |
NOT_EXISTS = r'^attribute_not_exists\(.+\)$' | |
BEGINS_WITH = r'begins_with\(.+\)' | |
L_PARENTH = r'\(' | |
R_PARENTH = r'\)' | |
NOT = r'NOT' | |
AND = r'AND' | |
OR = r'OR' | |
SIZE = r'size\(.+\)' | |
ignore = ' \t' | |
def __init__(self, message): | |
self.message = message | |
@_(r'\d+') | |
def INT(self, t): | |
try: | |
t.value = int(t.value) | |
except ValueError: | |
print("Invalid type for " % t.value) | |
t.value = 0 | |
return t | |
@_(r'(\d*)?\.\d+') | |
def FLOAT(self,t): | |
try: | |
t.value = float(t.value) | |
except ValueError: | |
print("Invalid type for " % t.value) | |
t.value = 0 | |
return t | |
@_(r'([a-zA-Z_\-0-9]+(\[[0-9]*\])?\.?)+') | |
def PATH(self, t): | |
# We want PATH to actually be the value of the JSON path | |
try: | |
t.value = self.getPathValue(t.value) | |
except Exception as e: | |
print("Path Error") | |
print(e) | |
return t | |
@_(r'(\'|").+(\'|")') | |
def STRING(self, t): | |
t.value = str(re.sub('\'|\"', '', t.value)) | |
return t | |
def error(self, t): | |
print("Illegal character '%s'" % t.value[0]) | |
def getPathValue(self, path): | |
path = parse(path) | |
try: | |
v = [ match.value for match in path.find(self.message) ][0] | |
if not v: | |
return False | |
return v | |
#return [ match.value for match in path.find(self.message) ] | |
except Exception as e: | |
print(e) | |
return False | |
class DynamoParse(Parser): | |
if '--debug' in sys.argv: | |
debugfile = '/dev/stdout' | |
tokens = DynamoLex.tokens | |
message = DynamoLex.message | |
precedence = ( | |
('left', 'OR', 'AND'), | |
('right', 'NOT'), | |
('left', 'EXISTS', 'NOT_EXISTS', 'BEGINS_WITH'), | |
('left', 'BETWEEN'), | |
('left', 'IN'), | |
('left', 'L_PARENTH','R_PARENTH'), | |
('left', 'EQ', 'NE', 'LT', 'LT_OR_EQ', 'GT', 'GT_OR_EQ') | |
) | |
def __init__(self): | |
self.message = message | |
def getPathValue(self, path): | |
path = parse(path) | |
try: | |
v = [match.value for match in path.find(self.message)][0] | |
if not v: | |
return False | |
return v | |
#return [ match.value for match in path.find(self.message) ] | |
except Exception as e: | |
print(e) | |
return False | |
def isBase64(self, value): | |
try: | |
return base64.b64encode(base64.b64decode(value)) == value | |
except Exception: | |
return False | |
@_('expr EQ expr') | |
def expr(self, p): | |
#First token is the path | |
return p[0] == p[2] | |
@_('expr NE expr') | |
def expr(self, p): | |
return p[0] != p[2] | |
@_('expr LT expr') | |
def expr(self, p): | |
return p[0] < p[2] | |
@_('expr LT_OR_EQ expr') | |
def expr(self, p): | |
return p[0] <= p[2] | |
@_('expr GT expr') | |
def expr(self, p): | |
return p[0] > p[2] | |
@_('expr GT_OR_EQ expr') | |
def expr(self, p): | |
return p[0] >= p[2] | |
@_('expr IN expr') | |
def expr(self, p): | |
return p[0] in p[2] | |
@_('expr BETWEEN expr') | |
def expr(self, p): | |
return (p[0] <= p[2] and p[0] >= p[4]) or (p[0] <= p[4] and p[0] >= p[2]) | |
@_('EXISTS') | |
def expr(self, p): | |
# if self.getPathValue returns any non False value then the path must exist | |
return DynamoLex.getPathValue(re.sub('^attribute_exists\(\'|")|(\'|")\)', '', p[0])) | |
@_('NOT_EXISTS') | |
def expr(self, p): | |
# same as above but return values are reversed | |
return not DynamoLex.getPathValue(re.sub('^attribute_not_exists\(\'|")|(\'|")\)', '', p[0])) | |
@_('BEGINS_WITH') | |
def expr(self, p): | |
args = p = re.sub('(begins_with)|(\s*)|(\'|")|\(|\)', '', p[0]).split(',') | |
path = args[0] | |
s = args[1] | |
return self.getPathValue(path).startswith(s) | |
@_('SIZE') | |
def expr(self, p): | |
v = DynamoLex.getPathValue(re.sub('^size\(\'|\'\)', '', p[0])) | |
if (type(v) is int) or (type(v) is float): | |
v = str(v) | |
if type(v) is bool: | |
return 0 | |
if self.isBase64(v): | |
return (len(v) * 3) / 4 | |
# Default. Matches: lists, dicts, strings, strings | |
return len(v) | |
@_('L_PARENTH expr R_PARENTH') | |
def expr(self, p): | |
return ( p[1] ) | |
@_('expr OR expr') | |
def expr(self, p): | |
return p[0] or p[2] | |
@_('expr AND expr') | |
def expr(self, p): | |
return p[0] and p[2] | |
@_('NOT expr') | |
def expr(self, p): | |
return not p[1] | |
@_('FLOAT') | |
def expr(self,p): | |
return p[0] | |
@_('INT') | |
def expr(self,p): | |
return p[0] | |
@_('STRING') | |
def expr(self,p): | |
return p[0] | |
@_('PATH') | |
def expr(self,p): | |
return p[0] | |
message = {"body": {"bar": "baz", "baz": [1, 2, 3, 4, 5, 6, 7, 8], "foo": "bar", "1": "test"}} | |
#qry = "bar IN body AND body.baz[0]=1" | |
lexer = DynamoLex(message) | |
parser = DynamoParse() | |
while True: | |
try: | |
text = input('QUERY > ') | |
result = parser.parse(lexer.tokenize(text)) | |
print(result) | |
except EOFError: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment