Skip to content

Instantly share code, notes, and snippets.

@timvw
Last active May 12, 2023 10:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timvw/a9174052f5bdf2922d087d0d623ac0fb to your computer and use it in GitHub Desktop.
Save timvw/a9174052f5bdf2922d087d0d623ac0fb to your computer and use it in GitHub Desktop.
DBT on EMR/EKS with spark
from pyspark.sql import SparkSession
from dbt.cli.main import dbtRunner, dbtRunnerResult
import sys
def init_spark() -> SparkSession:
return (SparkSession
.builder
.appName("tvwassen_dbt_spark")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.parquet.compression.codec", "snappy")
.config("mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.enableHiveSupport()
.getOrCreate())
if __name__ == '__main__':
spark = init_spark()
args = sys.argv[1:]
dbt = dbtRunner()
res: dbtRunnerResult = dbt.invoke(args)
exit_code = 0 if res.success else 1
spark.sparkContext._gateway.shutdown_callback_server()
sys.exit(exit_code)
FROM maven:3.6.3-jdk-8 as builder
RUN mvn dependency:copy -Dartifact=org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 -DoutputDirectory=/opt/jars
RUN mvn dependency:copy -Dartifact=io.delta:delta-core_2.12:2.2.0 -DoutputDirectory=/opt/jars
RUN mvn dependency:copy -Dartifact=io.delta:delta-storage:2.2.0 -DoutputDirectory=/opt/jars
FROM 107292555468.dkr.ecr.eu-central-1.amazonaws.com/spark/emr-6.9.0:latest
COPY --from=builder /opt/jars/hudi-spark3.3-bundle_2.12-0.13.0.jar /usr/lib/spark/jars
COPY --from=builder /opt/jars/delta-core_2.12-2.2.0.jar /usr/lib/spark/jars
COPY --from=builder /opt/jars/delta-storage-2.2.0.jar /usr/lib/spark/jars
WORKDIR /app
USER root
RUN pip3 install dbt-core==1.5
RUN pip3 install 'dbt-spark[session]==1.5' --no-deps
RUN pip3 install 'sqlparams>=3.0.0'
USER hadoop
ENV PATH="$PATH:/home/hadoop/.local/bin"
ENV DBT_PROFILES_DIR=/app
ENV DBT_DEBUG=false
ENV DBT_LOG_PATH=/mnt/var/log/dbt
ENV DBT_TARGET_PATH=/tmp/dbt/target
COPY dbt_project.yml dbt_project.yml
COPY packages.yml packages.yml
RUN dbt deps
COPY app/*.py .
COPY profiles.yml profiles.yml
COPY analyses analyses
COPY seeds seeds
COPY snapshots snapshots
COPY macros macros
COPY models models
COPY tests tests
tvwassen_dbt_spark:
outputs:
dev:
host: localhost
method: session
port: 443
schema: default
threads: 1
type: spark
target: dev
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment