Skip to content

Instantly share code, notes, and snippets.

@kainoa21
Last active February 1, 2024 20:48
Show Gist options
  • Save kainoa21/f3d01c607fce2741cef22683048a22a3 to your computer and use it in GitHub Desktop.
Save kainoa21/f3d01c607fce2741cef22683048a22a3 to your computer and use it in GitHub Desktop.
PyIceberg + DuckDB + SqlGlot
from sqlglot import exp, parse_one
from sqlglot.optimizer.scope import build_scope
from pyiceberg import catalog, expressions
# convert sqlmesh columns and literals into pyiceberg column names and literals
def sqlglot_convert_column_or_literal_to_pyiceberg(expression):
match type(expression):
case exp.Literal:
return expression.this if expression.is_string else int(expression.this)
case exp.Column:
return expression.this.this
case _:
print(f'Unable to convert column or literal {expression} with type {type(expression)}')
# convert sqlmesh expression to a pyiceberg boolean expression
def sqlglot_expression_to_pyiceberg_boolean(expression):
if isinstance(expression, exp.Connector):
return sqlglot_connector_to_pyiceberg_boolean(expression)
left = sqlglot_convert_column_or_literal_to_pyiceberg(expression.this)
right = sqlglot_convert_column_or_literal_to_pyiceberg(expression.expression)
# TODO: At least one of these terms must be a column identifier, otherwise we can just ignore it
match type(expression):
case exp.EQ:
return expressions.EqualTo(left, right)
case exp.NEQ:
return expressions.NotEqualTo(left, right)
case exp.GT:
return expressions.GreaterThan(left, right)
case exp.GTE:
return expressions.GreaterThanOrEqual(left, right)
case exp.LT:
return expressions.LessThan(left, right)
case exp.LTE:
return expressions.LessThanOrEqual(left, right)
case _:
print(f'Unable to convert boolean expression {expression} with type {type(expression)}')
def sqlglot_connector_to_pyiceberg_boolean(expression):
left = sqlglot_expression_to_pyiceberg_boolean(expression.this)
right = sqlglot_expression_to_pyiceberg_boolean(expression.expression)
match type(expression):
case exp.And:
return expressions.And(left, right)
case exp.Or:
return expressions.Or(left, right)
case _:
print(f'Unable to convert connector expression {expression} with type {type(expression)}')
# Parse sql to load the table and run a scan based on query predicates
def execute(iceberg_catalog, sql):
# parse sql into a sqlglot ast
ast = parse_one(sql)
root = build_scope(ast)
# grab the table from the ast
table = ast.find(exp.Table)
# load the iceberg table
iceberg_table = iceberg_catalog.load_table((table.args['db'].this, table.this.this))
# retrieve the where clause if it exists
where = root.expression.args.get('where')
# convert the sqlmesh where clause expressions into pyiceberg boolean expressions
row_filter = sqlglot_expression_to_pyiceberg_boolean(where.this) if where else expressions.AlwaysTrue()
# run the pyiceberg table scan
scan = iceberg_table.scan(
row_filter=row_filter
)
# register with DuckDb
con = scan.to_duckdb(table_name=iceberg_table.name()[2])
# execute original query with the iceberg table name substituted
return con.execute(exp.replace_tables(ast, {f"{iceberg_table.name()[1]}.{iceberg_table.name()[2]}": iceberg_table.name()[2]}).sql())
# load up our iceberg catalog
iceberg_catalog = catalog.load_catalog(
"sandbox",
**{
"uri": "https://api.tabular.io/ws",
"credential": "t-<redacted>"
}
)
# run some sql
execute(iceberg_catalog, "SELECT count(*) from examples.nyc_taxi_yellow where pickup_time >= '2021-03-01T00:00:00+00:00' and pickup_time < '2021-04-01T00:00:00+00:00'").fetchall()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment