Skip to content

Instantly share code, notes, and snippets.

@hannes
Last active August 24, 2025 13:49
Show Gist options
  • Select an option

  • Save hannes/395ac65766cdbddb0972ab337224fcb4 to your computer and use it in GitHub Desktop.

Select an option

Save hannes/395ac65766cdbddb0972ab337224fcb4 to your computer and use it in GitHub Desktop.
import pyspark
ducklake_uri = 'postgres:dbname=ducklake'
table_name = 'lineitem'
# still TODO:
# read specific snapshot
spark = (pyspark.sql.SparkSession.builder
.config('spark.jars.packages', 'org.duckdb:duckdb_jdbc:1.3.1.0')
.config('spark.driver.defaultJavaOptions', '-Djava.security.manager=allow')
.getOrCreate())
index_col_name = '__ducklake_file_index'
def jdbc_setup():
return (spark.read.format('jdbc')
.option('driver', 'org.duckdb.DuckDBDriver')
.option('url', f'jdbc:duckdb:ducklake:{ducklake_uri}'))
partitioning_info = (
jdbc_setup().option('query', f'''
SELECT
min(file_index::BIGINT)::STRING min_index,
(max(file_index::BIGINT)+1)::STRING max_index,
count(DISTINCT file_index::BIGINT)::STRING num_files
FROM "{table_name}"''').load().collect()[0])
table_wrapper = (jdbc_setup()
.option('dbtable', f'(SELECT *, file_index::BIGINT {index_col_name}, FROM "{table_name}") "{table_name}"')
.option('partitionColumn', index_col_name)
.option('lowerBound', partitioning_info['min_index'])
.option('upperBound', partitioning_info['max_index'])
.option('numPartitions', partitioning_info['num_files'])
.load())
table_wrapper.createOrReplaceTempView(table_name)
# lets just run q1
q = '''
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
FROM
lineitem
WHERE
l_shipdate <= date '1998-12-01' - interval '90' day
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus;
'''
print(spark.sql(q).explain())
print(spark.sql(q).collect())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment