Last active
February 1, 2024 20:48
-
-
Save kainoa21/f3d01c607fce2741cef22683048a22a3 to your computer and use it in GitHub Desktop.
PyIceberg + DuckDB + SqlGlot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlglot import exp, parse_one | |
from sqlglot.optimizer.scope import build_scope | |
from pyiceberg import catalog, expressions | |
# convert sqlmesh columns and literals into pyiceberg column names and literals | |
def sqlglot_convert_column_or_literal_to_pyiceberg(expression): | |
match type(expression): | |
case exp.Literal: | |
return expression.this if expression.is_string else int(expression.this) | |
case exp.Column: | |
return expression.this.this | |
case _: | |
print(f'Unable to convert column or literal {expression} with type {type(expression)}') | |
# convert sqlmesh expression to a pyiceberg boolean expression | |
def sqlglot_expression_to_pyiceberg_boolean(expression): | |
if isinstance(expression, exp.Connector): | |
return sqlglot_connector_to_pyiceberg_boolean(expression) | |
left = sqlglot_convert_column_or_literal_to_pyiceberg(expression.this) | |
right = sqlglot_convert_column_or_literal_to_pyiceberg(expression.expression) | |
# TODO: At least one of these terms must be a column identifier, otherwise we can just ignore it | |
match type(expression): | |
case exp.EQ: | |
return expressions.EqualTo(left, right) | |
case exp.NEQ: | |
return expressions.NotEqualTo(left, right) | |
case exp.GT: | |
return expressions.GreaterThan(left, right) | |
case exp.GTE: | |
return expressions.GreaterThanOrEqual(left, right) | |
case exp.LT: | |
return expressions.LessThan(left, right) | |
case exp.LTE: | |
return expressions.LessThanOrEqual(left, right) | |
case _: | |
print(f'Unable to convert boolean expression {expression} with type {type(expression)}') | |
def sqlglot_connector_to_pyiceberg_boolean(expression): | |
left = sqlglot_expression_to_pyiceberg_boolean(expression.this) | |
right = sqlglot_expression_to_pyiceberg_boolean(expression.expression) | |
match type(expression): | |
case exp.And: | |
return expressions.And(left, right) | |
case exp.Or: | |
return expressions.Or(left, right) | |
case _: | |
print(f'Unable to convert connector expression {expression} with type {type(expression)}') | |
# Parse sql to load the table and run a scan based on query predicates | |
def execute(iceberg_catalog, sql): | |
# parse sql into a sqlglot ast | |
ast = parse_one(sql) | |
root = build_scope(ast) | |
# grab the table from the ast | |
table = ast.find(exp.Table) | |
# load the iceberg table | |
iceberg_table = iceberg_catalog.load_table((table.args['db'].this, table.this.this)) | |
# retrieve the where clause if it exists | |
where = root.expression.args.get('where') | |
# convert the sqlmesh where clause expressions into pyiceberg boolean expressions | |
row_filter = sqlglot_expression_to_pyiceberg_boolean(where.this) if where else expressions.AlwaysTrue() | |
# run the pyiceberg table scan | |
scan = iceberg_table.scan( | |
row_filter=row_filter | |
) | |
# register with DuckDb | |
con = scan.to_duckdb(table_name=iceberg_table.name()[2]) | |
# execute original query with the iceberg table name substituted | |
return con.execute(exp.replace_tables(ast, {f"{iceberg_table.name()[1]}.{iceberg_table.name()[2]}": iceberg_table.name()[2]}).sql()) | |
# load up our iceberg catalog | |
iceberg_catalog = catalog.load_catalog( | |
"sandbox", | |
**{ | |
"uri": "https://api.tabular.io/ws", | |
"credential": "t-<redacted>" | |
} | |
) | |
# run some sql | |
execute(iceberg_catalog, "SELECT count(*) from examples.nyc_taxi_yellow where pickup_time >= '2021-03-01T00:00:00+00:00' and pickup_time < '2021-04-01T00:00:00+00:00'").fetchall() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment