Skip to content

Instantly share code, notes, and snippets.

@chancyk
Last active January 25, 2017 05:41
Show Gist options
  • Save chancyk/b2f273f109d8946dd79037cc0c7815ab to your computer and use it in GitHub Desktop.
Save chancyk/b2f273f109d8946dd79037cc0c7815ab to your computer and use it in GitHub Desktop.
Python Parsley OMeta SQL Parser
"""
LICENSE
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
DESCRIPTION
This is a prototype, parsley implementation of a subset of a MySQL-like SQL syntax.
So far it supports:
* sub-queries inside of the FROM and JOIN expressions.
* dot syntax for reference fields of tables.
* double-quote style labels within the SELECT expression.
* table aliases as the unquoted token following the table name.
This code will likely not be expanded upon and I've solely made it available
in case someone is interested in doing something similar with Parsley/OMeta,
as I was unable to find any SQL implementations myself.
"""
import re
import string
from pprint import pprint
from collections import namedtuple
from parsley import makeGrammar
Query = namedtuple('Query', 'select from_ joins where')
Query.__new__.__defaults__ = (None, None, None, None)
Field = namedtuple('Field', 'label table field')
Table = namedtuple('Table', 'alias table')
JoinTable = namedtuple('JoinTable', Table._fields)
SubQuery = namedtuple('SubQuery', 'alias query')
FromQuery = namedtuple('FromQuery', SubQuery._fields)
JoinQuery = namedtuple('JoinQuery', SubQuery._fields)
LABEL_CHARS = \
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
"abcdefghijklmnopqrstuvwxyz" \
"_"
NAME_CHARS = \
"abcdefghijklmnopqrstuvwxyz" \
"-"
SQL_GRAMMAR = r"""
mws = (' ' | '\r' | '\n' | '\t')*
digit = anything:x ?(x in '0123456789')
number = <digit+>:n -> int(n)
name_char = anything:x ?(x in NAME_CHARS)
name = <name_char+>:n <digit?>:d -> n + (d or '')
label_char = anything:x ?(x in LABEL_CHARS)
label = <label_char+>:l -> l
sql_expr =
select_expr:s
ws from_expr:f
( ws join_exprs:j ws where_expr:w
-> Query(select=s, from_=f, joins=j, where=w)
| ws join_exprs:j
-> Query(select=s, from_=f, joins=j)
| ws where_expr:w
-> Query(select=s, from_=f, where=w)
|-> Query(select=s, from_=f)
)
select_expr = 'SELECT' ws fields
field = name:f ~'.' -> Field(label=None, table=None, field=f)
dot_field = name:t '.' name:f -> Field(label=None, table=t, field=f)
field_expr = <field:f | dot_field:f> -> f
unaliased_field_expr =
field_expr:f ~(mws '"')
( ',' mws -> Field(label=None, table=f.table, field=f.field)
| -> Field(label=None, table=f.table, field=f.field)
)
label_start = '"' !(set_quoting())
label_end = '"' !(unset_quoting())
aliased_field_expr =
field_expr:f mws label_start label:l label_end
( ',' mws -> Field(label=l, table=f.table, field=f.field)
| -> Field(label=l, table=f.table, field=f.field)
)
fields =
<(unaliased_field_expr | aliased_field_expr)+:f>
!(add_to_fields(f))
-> f
table = name:t
( ' '+ name:a -> Table(alias=a, table=t)
| -> Table(alias=None, table=t)
)
sub_query =
'(' mws sql_expr:q mws ')' ws name:a
-> SubQuery(alias=a, query=q)
from_expr = 'FROM' ws
( table:t -> Table(**t._asdict())
| sub_query:q -> FromQuery(**q._asdict())
)
join_query_expr = sub_query:q -> JoinQuery(**q._asdict())
join_table_expr =
table:t ws
'ON' ws criteria:c
-> (JoinTable(**t._asdict()), c)
join_type =
( 'INNER JOIN' -> 'inner join'
| 'LEFT JOIN' -> 'left join'
| 'RIGHT JOIN' -> 'right join'
| 'OUTER JOIN' -> 'outer join'
)
join_expr =
join_type:t ws
( join_table_expr:j | join_query_expr:j ) mws
-> (t, j)
join_exprs =
<join_expr+:j> -> j
string_char = anything:x ?(x in PRINTABLE) -> x
string_start = '\'' !(set_quoting())
string_end = '\'' !(unset_quoting())
escaped_char =
'\\' ( ('\\' -> '\\')
| ('n' -> '\n')
| ('r' -> '\r')
| ('t' -> '\t')
| ('\'' -> '\'')
)
string = string_start (escaped_char | ~'\'' string_char)*:c string_end -> ''.join(c)
value = field_expr | string | number
filter_expr = field_expr:f mws '=' mws value:v -> (f, v)
or_expr = 'OR' ws <filter_expr:c | sub_criteria:c> -> c
and_expr = 'AND' ws <filter_expr:c | sub_criteria:c> -> c
bool_op = 'OR' | 'AND'
bool_expr = <or_expr:c ws | and_expr:c ws> -> c
sub_criteria = '(' mws criteria:c mws ')' -> c
more_criteria = (bool_expr+:c | sub_criteria:c) -> c
criteria =
<filter_expr:c | sub_criteria:c> mws
( ~~bool_op ws
more_criteria:mc -> (c, mc)
| -> c
)
where_expr =
'WHERE' ws criteria:c -> c
"""
class SQL_Expression:
def __init__(self):
self.fields = []
self.trace = []
self.quoting = False
def add_to_fields(self, f):
self.fields.append(f)
def set_quoting(self):
self.quoting = True
def unset_quoting(self):
self.quoting = False
def was_quoting(self):
return self.quoting
def traceit(self, *a):
self.trace.append(a)
if __name__ == '__main__':
sql_expr = SQL_Expression()
g = makeGrammar(
SQL_GRAMMAR,
{ "LABEL_CHARS" : LABEL_CHARS,
"NAME_CHARS" : NAME_CHARS,
"PRINTABLE": string.printable,
"set_quoting": sql_expr.set_quoting,
"unset_quoting": sql_expr.unset_quoting,
"add_to_fields": sql_expr.add_to_fields,
"Query": Query,
"Field": Field,
"Table": Table,
"JoinTable": JoinTable,
"SubQuery": SubQuery,
"FromQuery": FromQuery,
"JoinQuery": JoinQuery
},
tracefunc=sql_expr.traceit
)
sql = \
"""
SELECT t.dog "Dog", t.cat "Cat"
FROM animals a
INNER JOIN ( SELECT dog FROM animals a2 ) jq
LEFT JOIN aliens al ON al.id = jq.id
WHERE ( a = 2 AND d = 3 AND ( e = 4 OR f = 5 ) )
AND b = 2
AND c = 'hello'
""".strip()
try:
query = g(sql).sql_expr()
print('SELECT')
pprint(query.select)
print('FROM')
pprint(query.from_)
print('JOINS')
pprint(query.joins)
print('WHERE')
pprint(query.where)
except Exception as exc:
e = exc
print("Failed to parse grammar:")
print(e.formatError())
if sql_expr.was_quoting():
print(" - Check your quotes in the WHERE expression.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment