Last active
February 10, 2025 01:36
TPC-DS Built for Iceberg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: "3" | |
services: | |
spark-iceberg: | |
image: tabulario/spark-iceberg | |
container_name: spark-iceberg | |
build: spark/ | |
networks: | |
iceberg_net: | |
depends_on: | |
- rest | |
- minio | |
volumes: | |
- ./warehouse:/home/iceberg/warehouse | |
- ./notebooks:/home/iceberg/notebooks/notebooks | |
- ./sample:/home/iceberg/sample | |
environment: | |
- AWS_ACCESS_KEY_ID=admin | |
- AWS_SECRET_ACCESS_KEY=password | |
- AWS_REGION=us-east-1 | |
ports: | |
- 8888:8888 | |
- 8080:8080 | |
- 10000:10000 | |
- 10001:10001 | |
rest: | |
image: tabulario/iceberg-rest | |
container_name: iceberg-rest | |
networks: | |
iceberg_net: | |
aliases: | |
- iceberg-rest.minio | |
ports: | |
- 8181:8181 | |
environment: | |
- AWS_ACCESS_KEY_ID=admin | |
- AWS_SECRET_ACCESS_KEY=password | |
- AWS_REGION=us-east-1 | |
- CATALOG_WAREHOUSE=s3://warehouse/ | |
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO | |
- CATALOG_S3_ENDPOINT=http://minio:9000 | |
minio: | |
image: minio/minio | |
container_name: minio | |
environment: | |
- MINIO_ROOT_USER=admin | |
- MINIO_ROOT_PASSWORD=password | |
- MINIO_DOMAIN=minio | |
networks: | |
iceberg_net: | |
aliases: | |
- warehouse.minio | |
ports: | |
- 9001:9001 | |
- 9000:9000 | |
command: ["server", "/data", "--console-address", ":9001"] | |
mc: | |
depends_on: | |
- minio | |
image: minio/mc | |
container_name: mc | |
networks: | |
iceberg_net: | |
environment: | |
- AWS_ACCESS_KEY_ID=admin | |
- AWS_SECRET_ACCESS_KEY=password | |
- AWS_REGION=us-east-1 | |
entrypoint: > | |
/bin/sh -c " | |
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; | |
/usr/bin/mc rm -r --force minio/warehouse; | |
/usr/bin/mc mb minio/warehouse; | |
/usr/bin/mc policy set public minio/warehouse; | |
tail -f /dev/null | |
" | |
networks: | |
iceberg_net: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
dat = [ | |
'call_center.dat', | |
'catalog_page.dat', | |
'catalog_returns.dat', | |
'catalog_sales.dat', | |
'customer.dat', | |
'customer_address.dat', | |
'customer_demographics.dat', | |
'date_dim.dat', | |
# 'dbgen_version.dat', | |
'household_demographics.dat', | |
'income_band.dat', | |
'inventory.dat', | |
'item.dat', | |
'promotion.dat', | |
'reason.dat', | |
'ship_mode.dat', | |
'store.dat', | |
'store_returns.dat', | |
'store_sales.dat', | |
'time_dim.dat', | |
'warehouse.dat', | |
'web_page.dat', | |
'web_returns.dat', | |
'web_sales.dat', | |
'web_site.dat' | |
] | |
spark = SparkSession.builder.appName("Import CSV").getOrCreate() | |
for f in dat: | |
df = spark.read.csv(f"file:///home/iceberg/sample/{f[:-4]}.dat", header=False, inferSchema=True, sep="|") | |
df = df.drop(df.columns[-1]) # drop the last empty column | |
df.write.mode("append").insertInto(f"demo.test.{f[:-4]}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import argparse | |
def modify_ddl(input_file, output_file, catalog, database): | |
with open(input_file, 'r') as f: | |
content = f.read() | |
statements = content.split(';') | |
modified_statements = [] | |
for stmt in statements: | |
stmt = stmt.strip() | |
if not stmt: | |
continue | |
# Rename table | |
stmt = re.sub( | |
r'^create\s+table\s+(\w+)', | |
f'create table {catalog}.{database}.\\1', | |
stmt, | |
flags=re.IGNORECASE | |
) | |
lines = [line.rstrip() for line in stmt.split('\n')] | |
pk_line_index = -1 | |
pk_columns = None | |
# Find primary key | |
for i, line in enumerate(lines): | |
match = re.search(r'primary\s+key\s*\(([^)]+)\)', line, re.IGNORECASE) | |
if match: | |
pk_line_index = i | |
pk_columns = match.group(1).strip() | |
break | |
if pk_line_index != -1 and pk_columns: | |
# Remove primary key | |
del lines[pk_line_index] | |
# Remove comma | |
if pk_line_index > 0: | |
prev_line = lines[pk_line_index - 1].rstrip() | |
if prev_line.endswith(','): | |
lines[pk_line_index - 1] = prev_line[:-1].rstrip() | |
# Add PARTITIONED BY | |
closing_line = -1 | |
for i in reversed(range(len(lines))): | |
if ')' in lines[i]: | |
closing_line = i | |
break | |
if closing_line != -1: | |
lines[closing_line] = re.sub( | |
r'\)\s*$', | |
f') PARTITIONED BY ({pk_columns})', | |
lines[closing_line] | |
) | |
modified_stmt = '\n'.join(lines) | |
modified_statements.append(modified_stmt.strip() + ';') | |
new_content = '\n\n'.join(modified_statements) | |
with open(output_file, 'w') as f: | |
f.write(new_content) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment