Skip to content

Instantly share code, notes, and snippets.

@mathewmoon
Created July 24, 2019 21:57
Show Gist options
  • Save mathewmoon/737ad52e4ef158daac59b3f00c3fd52b to your computer and use it in GitHub Desktop.
Save mathewmoon/737ad52e4ef158daac59b3f00c3fd52b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.7
import sys
import re
import base64
import sys
from jsonpath_ng import jsonpath, parse
from sly import Lexer, Parser
class DynamoLex(Lexer):
tokens = [
'EQ',
'NE',
'LT',
'LT_OR_EQ',
'GT',
'GT_OR_EQ',
'IN',
'BETWEEN',
'EXISTS',
'NOT_EXISTS',
'BEGINS_WITH',
'L_PARENTH',
'R_PARENTH',
'NOT',
'AND',
'OR',
'STRING',
'PATH',
'INT',
'FLOAT',
'SIZE'
]
message = {}
EQ = r'='
NE = r'<>'
LT = r'<'
LT_OR_EQ = r'<='
GT = r'>'
GT_OR_EQ = r'>='
IN = r'IN'
BETWEEN = r'BETWEEN'
EXISTS = r'^attribute_exists\(.+\)$'
NOT_EXISTS = r'^attribute_not_exists\(.+\)$'
BEGINS_WITH = r'begins_with\(.+\)'
L_PARENTH = r'\('
R_PARENTH = r'\)'
NOT = r'NOT'
AND = r'AND'
OR = r'OR'
SIZE = r'size\(.+\)'
ignore = ' \t'
def __init__(self, message):
self.message = message
@_(r'\d+')
def INT(self, t):
try:
t.value = int(t.value)
except ValueError:
print("Invalid type for " % t.value)
t.value = 0
return t
@_(r'(\d*)?\.\d+')
def FLOAT(self,t):
try:
t.value = float(t.value)
except ValueError:
print("Invalid type for " % t.value)
t.value = 0
return t
@_(r'([a-zA-Z_\-0-9]+(\[[0-9]*\])?\.?)+')
def PATH(self, t):
# We want PATH to actually be the value of the JSON path
try:
t.value = self.getPathValue(t.value)
except Exception as e:
print("Path Error")
print(e)
return t
@_(r'(\'|").+(\'|")')
def STRING(self, t):
t.value = str(re.sub('\'|\"', '', t.value))
return t
def error(self, t):
print("Illegal character '%s'" % t.value[0])
def getPathValue(self, path):
path = parse(path)
try:
v = [ match.value for match in path.find(self.message) ][0]
if not v:
return False
return v
#return [ match.value for match in path.find(self.message) ]
except Exception as e:
print(e)
return False
class DynamoParse(Parser):
if '--debug' in sys.argv:
debugfile = '/dev/stdout'
tokens = DynamoLex.tokens
message = DynamoLex.message
precedence = (
('left', 'OR', 'AND'),
('right', 'NOT'),
('left', 'EXISTS', 'NOT_EXISTS', 'BEGINS_WITH'),
('left', 'BETWEEN'),
('left', 'IN'),
('left', 'L_PARENTH','R_PARENTH'),
('left', 'EQ', 'NE', 'LT', 'LT_OR_EQ', 'GT', 'GT_OR_EQ')
)
def __init__(self):
self.message = message
def getPathValue(self, path):
path = parse(path)
try:
v = [match.value for match in path.find(self.message)][0]
if not v:
return False
return v
#return [ match.value for match in path.find(self.message) ]
except Exception as e:
print(e)
return False
def isBase64(self, value):
try:
return base64.b64encode(base64.b64decode(value)) == value
except Exception:
return False
@_('expr EQ expr')
def expr(self, p):
#First token is the path
return p[0] == p[2]
@_('expr NE expr')
def expr(self, p):
return p[0] != p[2]
@_('expr LT expr')
def expr(self, p):
return p[0] < p[2]
@_('expr LT_OR_EQ expr')
def expr(self, p):
return p[0] <= p[2]
@_('expr GT expr')
def expr(self, p):
return p[0] > p[2]
@_('expr GT_OR_EQ expr')
def expr(self, p):
return p[0] >= p[2]
@_('expr IN expr')
def expr(self, p):
return p[0] in p[2]
@_('expr BETWEEN expr')
def expr(self, p):
return (p[0] <= p[2] and p[0] >= p[4]) or (p[0] <= p[4] and p[0] >= p[2])
@_('EXISTS')
def expr(self, p):
# if self.getPathValue returns any non False value then the path must exist
return DynamoLex.getPathValue(re.sub('^attribute_exists\(\'|")|(\'|")\)', '', p[0]))
@_('NOT_EXISTS')
def expr(self, p):
# same as above but return values are reversed
return not DynamoLex.getPathValue(re.sub('^attribute_not_exists\(\'|")|(\'|")\)', '', p[0]))
@_('BEGINS_WITH')
def expr(self, p):
args = p = re.sub('(begins_with)|(\s*)|(\'|")|\(|\)', '', p[0]).split(',')
path = args[0]
s = args[1]
return self.getPathValue(path).startswith(s)
@_('SIZE')
def expr(self, p):
v = DynamoLex.getPathValue(re.sub('^size\(\'|\'\)', '', p[0]))
if (type(v) is int) or (type(v) is float):
v = str(v)
if type(v) is bool:
return 0
if self.isBase64(v):
return (len(v) * 3) / 4
# Default. Matches: lists, dicts, strings, strings
return len(v)
@_('L_PARENTH expr R_PARENTH')
def expr(self, p):
return ( p[1] )
@_('expr OR expr')
def expr(self, p):
return p[0] or p[2]
@_('expr AND expr')
def expr(self, p):
return p[0] and p[2]
@_('NOT expr')
def expr(self, p):
return not p[1]
@_('FLOAT')
def expr(self,p):
return p[0]
@_('INT')
def expr(self,p):
return p[0]
@_('STRING')
def expr(self,p):
return p[0]
@_('PATH')
def expr(self,p):
return p[0]
message = {"body": {"bar": "baz", "baz": [1, 2, 3, 4, 5, 6, 7, 8], "foo": "bar", "1": "test"}}
#qry = "bar IN body AND body.baz[0]=1"
lexer = DynamoLex(message)
parser = DynamoParse()
while True:
try:
text = input('QUERY > ')
result = parser.parse(lexer.tokenize(text))
print(result)
except EOFError:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment