-
-
Save hannes/395ac65766cdbddb0972ab337224fcb4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pyspark | |
| ducklake_uri = 'postgres:dbname=ducklake' | |
| table_name = 'lineitem' | |
| # still TODO: | |
| # read specific snapshot | |
| spark = (pyspark.sql.SparkSession.builder | |
| .config('spark.jars.packages', 'org.duckdb:duckdb_jdbc:1.3.1.0') | |
| .config('spark.driver.defaultJavaOptions', '-Djava.security.manager=allow') | |
| .getOrCreate()) | |
| index_col_name = '__ducklake_file_index' | |
| def jdbc_setup(): | |
| return (spark.read.format('jdbc') | |
| .option('driver', 'org.duckdb.DuckDBDriver') | |
| .option('url', f'jdbc:duckdb:ducklake:{ducklake_uri}')) | |
| partitioning_info = ( | |
| jdbc_setup().option('query', f''' | |
| SELECT | |
| min(file_index::BIGINT)::STRING min_index, | |
| (max(file_index::BIGINT)+1)::STRING max_index, | |
| count(DISTINCT file_index::BIGINT)::STRING num_files | |
| FROM "{table_name}"''').load().collect()[0]) | |
| table_wrapper = (jdbc_setup() | |
| .option('dbtable', f'(SELECT *, file_index::BIGINT {index_col_name}, FROM "{table_name}") "{table_name}"') | |
| .option('partitionColumn', index_col_name) | |
| .option('lowerBound', partitioning_info['min_index']) | |
| .option('upperBound', partitioning_info['max_index']) | |
| .option('numPartitions', partitioning_info['num_files']) | |
| .load()) | |
| table_wrapper.createOrReplaceTempView(table_name) | |
| # lets just run q1 | |
| q = ''' | |
| SELECT | |
| l_returnflag, | |
| l_linestatus, | |
| sum(l_quantity) as sum_qty, | |
| sum(l_extendedprice) as sum_base_price, | |
| sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, | |
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, | |
| avg(l_quantity) as avg_qty, | |
| avg(l_extendedprice) as avg_price, | |
| avg(l_discount) as avg_disc, | |
| count(*) as count_order | |
| FROM | |
| lineitem | |
| WHERE | |
| l_shipdate <= date '1998-12-01' - interval '90' day | |
| GROUP BY | |
| l_returnflag, | |
| l_linestatus | |
| ORDER BY | |
| l_returnflag, | |
| l_linestatus; | |
| ''' | |
| print(spark.sql(q).explain()) | |
| print(spark.sql(q).collect()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment