Last active
February 10, 2025 01:36
-
-
Save wirelessr/37b19323664cff6f9af42bd814f05a5d to your computer and use it in GitHub Desktop.
TPC-DS Built for Iceberg
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| version: "3" | |
| services: | |
| spark-iceberg: | |
| image: tabulario/spark-iceberg | |
| container_name: spark-iceberg | |
| build: spark/ | |
| networks: | |
| iceberg_net: | |
| depends_on: | |
| - rest | |
| - minio | |
| volumes: | |
| - ./warehouse:/home/iceberg/warehouse | |
| - ./notebooks:/home/iceberg/notebooks/notebooks | |
| - ./sample:/home/iceberg/sample | |
| environment: | |
| - AWS_ACCESS_KEY_ID=admin | |
| - AWS_SECRET_ACCESS_KEY=password | |
| - AWS_REGION=us-east-1 | |
| ports: | |
| - 8888:8888 | |
| - 8080:8080 | |
| - 10000:10000 | |
| - 10001:10001 | |
| rest: | |
| image: tabulario/iceberg-rest | |
| container_name: iceberg-rest | |
| networks: | |
| iceberg_net: | |
| aliases: | |
| - iceberg-rest.minio | |
| ports: | |
| - 8181:8181 | |
| environment: | |
| - AWS_ACCESS_KEY_ID=admin | |
| - AWS_SECRET_ACCESS_KEY=password | |
| - AWS_REGION=us-east-1 | |
| - CATALOG_WAREHOUSE=s3://warehouse/ | |
| - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO | |
| - CATALOG_S3_ENDPOINT=http://minio:9000 | |
| minio: | |
| image: minio/minio | |
| container_name: minio | |
| environment: | |
| - MINIO_ROOT_USER=admin | |
| - MINIO_ROOT_PASSWORD=password | |
| - MINIO_DOMAIN=minio | |
| networks: | |
| iceberg_net: | |
| aliases: | |
| - warehouse.minio | |
| ports: | |
| - 9001:9001 | |
| - 9000:9000 | |
| command: ["server", "/data", "--console-address", ":9001"] | |
| mc: | |
| depends_on: | |
| - minio | |
| image: minio/mc | |
| container_name: mc | |
| networks: | |
| iceberg_net: | |
| environment: | |
| - AWS_ACCESS_KEY_ID=admin | |
| - AWS_SECRET_ACCESS_KEY=password | |
| - AWS_REGION=us-east-1 | |
| entrypoint: > | |
| /bin/sh -c " | |
| until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; | |
| /usr/bin/mc rm -r --force minio/warehouse; | |
| /usr/bin/mc mb minio/warehouse; | |
| /usr/bin/mc policy set public minio/warehouse; | |
| tail -f /dev/null | |
| " | |
| networks: | |
| iceberg_net: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from pyspark.sql import SparkSession | |
| dat = [ | |
| 'call_center.dat', | |
| 'catalog_page.dat', | |
| 'catalog_returns.dat', | |
| 'catalog_sales.dat', | |
| 'customer.dat', | |
| 'customer_address.dat', | |
| 'customer_demographics.dat', | |
| 'date_dim.dat', | |
| # 'dbgen_version.dat', | |
| 'household_demographics.dat', | |
| 'income_band.dat', | |
| 'inventory.dat', | |
| 'item.dat', | |
| 'promotion.dat', | |
| 'reason.dat', | |
| 'ship_mode.dat', | |
| 'store.dat', | |
| 'store_returns.dat', | |
| 'store_sales.dat', | |
| 'time_dim.dat', | |
| 'warehouse.dat', | |
| 'web_page.dat', | |
| 'web_returns.dat', | |
| 'web_sales.dat', | |
| 'web_site.dat' | |
| ] | |
| spark = SparkSession.builder.appName("Import CSV").getOrCreate() | |
| for f in dat: | |
| df = spark.read.csv(f"file:///home/iceberg/sample/{f[:-4]}.dat", header=False, inferSchema=True, sep="|") | |
| df = df.drop(df.columns[-1]) # drop the last empty column | |
| df.write.mode("append").insertInto(f"demo.test.{f[:-4]}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import re | |
| import argparse | |
| def modify_ddl(input_file, output_file, catalog, database): | |
| with open(input_file, 'r') as f: | |
| content = f.read() | |
| statements = content.split(';') | |
| modified_statements = [] | |
| for stmt in statements: | |
| stmt = stmt.strip() | |
| if not stmt: | |
| continue | |
| # Rename table | |
| stmt = re.sub( | |
| r'^create\s+table\s+(\w+)', | |
| f'create table {catalog}.{database}.\\1', | |
| stmt, | |
| flags=re.IGNORECASE | |
| ) | |
| lines = [line.rstrip() for line in stmt.split('\n')] | |
| pk_line_index = -1 | |
| pk_columns = None | |
| # Find primary key | |
| for i, line in enumerate(lines): | |
| match = re.search(r'primary\s+key\s*\(([^)]+)\)', line, re.IGNORECASE) | |
| if match: | |
| pk_line_index = i | |
| pk_columns = match.group(1).strip() | |
| break | |
| if pk_line_index != -1 and pk_columns: | |
| # Remove primary key | |
| del lines[pk_line_index] | |
| # Remove comma | |
| if pk_line_index > 0: | |
| prev_line = lines[pk_line_index - 1].rstrip() | |
| if prev_line.endswith(','): | |
| lines[pk_line_index - 1] = prev_line[:-1].rstrip() | |
| # Add PARTITIONED BY | |
| closing_line = -1 | |
| for i in reversed(range(len(lines))): | |
| if ')' in lines[i]: | |
| closing_line = i | |
| break | |
| if closing_line != -1: | |
| lines[closing_line] = re.sub( | |
| r'\)\s*$', | |
| f') PARTITIONED BY ({pk_columns})', | |
| lines[closing_line] | |
| ) | |
| modified_stmt = '\n'.join(lines) | |
| modified_statements.append(modified_stmt.strip() + ';') | |
| new_content = '\n\n'.join(modified_statements) | |
| with open(output_file, 'w') as f: | |
| f.write(new_content) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment