Skip to content

Instantly share code, notes, and snippets.

@wirelessr
Last active February 10, 2025 01:36
TPC-DS Built for Iceberg
version: "3"
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
- ./sample:/home/iceberg/sample
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
aliases:
- iceberg-rest.minio
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
networks:
iceberg_net:
from pyspark.sql import SparkSession
dat = [
'call_center.dat',
'catalog_page.dat',
'catalog_returns.dat',
'catalog_sales.dat',
'customer.dat',
'customer_address.dat',
'customer_demographics.dat',
'date_dim.dat',
# 'dbgen_version.dat',
'household_demographics.dat',
'income_band.dat',
'inventory.dat',
'item.dat',
'promotion.dat',
'reason.dat',
'ship_mode.dat',
'store.dat',
'store_returns.dat',
'store_sales.dat',
'time_dim.dat',
'warehouse.dat',
'web_page.dat',
'web_returns.dat',
'web_sales.dat',
'web_site.dat'
]
spark = SparkSession.builder.appName("Import CSV").getOrCreate()
for f in dat:
df = spark.read.csv(f"file:///home/iceberg/sample/{f[:-4]}.dat", header=False, inferSchema=True, sep="|")
df = df.drop(df.columns[-1]) # drop the last empty column
df.write.mode("append").insertInto(f"demo.test.{f[:-4]}")
import re
import argparse
def modify_ddl(input_file, output_file, catalog, database):
with open(input_file, 'r') as f:
content = f.read()
statements = content.split(';')
modified_statements = []
for stmt in statements:
stmt = stmt.strip()
if not stmt:
continue
# Rename table
stmt = re.sub(
r'^create\s+table\s+(\w+)',
f'create table {catalog}.{database}.\\1',
stmt,
flags=re.IGNORECASE
)
lines = [line.rstrip() for line in stmt.split('\n')]
pk_line_index = -1
pk_columns = None
# Find primary key
for i, line in enumerate(lines):
match = re.search(r'primary\s+key\s*\(([^)]+)\)', line, re.IGNORECASE)
if match:
pk_line_index = i
pk_columns = match.group(1).strip()
break
if pk_line_index != -1 and pk_columns:
# Remove primary key
del lines[pk_line_index]
# Remove comma
if pk_line_index > 0:
prev_line = lines[pk_line_index - 1].rstrip()
if prev_line.endswith(','):
lines[pk_line_index - 1] = prev_line[:-1].rstrip()
# Add PARTITIONED BY
closing_line = -1
for i in reversed(range(len(lines))):
if ')' in lines[i]:
closing_line = i
break
if closing_line != -1:
lines[closing_line] = re.sub(
r'\)\s*$',
f') PARTITIONED BY ({pk_columns})',
lines[closing_line]
)
modified_stmt = '\n'.join(lines)
modified_statements.append(modified_stmt.strip() + ';')
new_content = '\n\n'.join(modified_statements)
with open(output_file, 'w') as f:
f.write(new_content)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment