Skip to content

Instantly share code, notes, and snippets.

@sungchun12
Created May 10, 2023 08:05
Show Gist options
  • Save sungchun12/f7239abd3a5cf05fcaa152b8e94a8cec to your computer and use it in GitHub Desktop.
Save sungchun12/f7239abd3a5cf05fcaa152b8e94a8cec to your computer and use it in GitHub Desktop.
Simple quickstart to dbt Core DAG with bashoperator using BigQuery
# This is an example DAG that is a good reference for your portfolio
import os
import tempfile
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Connection
from airflow.operators.bash import BashOperator
def save_keyfile_to_temp_file(gcp_conn_id):
conn = Connection.get_connection_from_secrets(conn_id=gcp_conn_id)
keyfile_dict = conn.extra_dejson['keyfile_dict']
# keyfile_json = json.dumps(keyfile_dict)
# Create a temporary file with a .json extension
fd, temp_file_path = tempfile.mkstemp(suffix='.json', text=True)
with os.fdopen(fd, 'w') as f:
f.write(keyfile_dict)
return temp_file_path
# TODO: replace the conn_id with your own
GCP_CONN_ID="your_bigquery_conn_id"
JSON_KEYFILE_PATH = save_keyfile_to_temp_file(GCP_CONN_ID)
PATH_TO_DBT_VENV = "/home/astro/dbt_venv/bin/activate"
# TODO replace these vars with your own
GIT_REPO = "dbt-bigquery-example"
PROJECT_ID = "dbt-demo-386220"
GIT_BRANCH = "main"
git_clone_cmds = f"""
git clone https://github.com/sungchun12/dbt-bigquery-example.git"""
dbt_setup_cmds = f"""
{git_clone_cmds} &&
cd {GIT_REPO} &&
git checkout {GIT_BRANCH} &&
export PROJECT_ID={PROJECT_ID} &&
export DBT_GOOGLE_BIGQUERY_KEYFILE={JSON_KEYFILE_PATH} &&
dbt deps"""
dbt_run_cmd = f"""
{dbt_setup_cmds} &&
dbt seed &&
dbt run
"""
# Set default arguments for the DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
"dbt_example_dag",
default_args=default_args,
description="An example DAG with extract, load tasks and dbt build using DockerOperator",
schedule_interval=timedelta(days=1),
catchup=False,
start_date=datetime(2023, 5, 9),
)
# DummyOperator for the extract task
extract_task = DummyOperator(
task_id="extract_task",
dag=dag,
)
# DummyOperator for the load task
load_task = DummyOperator(
task_id="load_task",
dag=dag,
)
dbt_run_task = BashOperator(
task_id="dbt_run_task",
bash_command=f"source $PATH_TO_DBT_VENV && {dbt_run_cmd}",
env={"PATH_TO_DBT_VENV": PATH_TO_DBT_VENV},
)
# Define the task dependencies
extract_task >> load_task >> dbt_run_task
FROM quay.io/astronomer/astro-runtime:7.4.2
# Install apache-airflow-providers-google package
RUN pip install apache-airflow-providers-google
# Switch to root user for installing git
USER root
# Install git
RUN apt-get update && apt-get install -y git
# Clean up APT cache
RUN apt-get clean && rm -rf /var/lib/apt/lists/*
# Switch back to the original user
USER astro
# Create a Python virtual environment and install dbt-bigquery
ENV DBT_VENV_PATH /home/astro/dbt_venv
RUN python -m venv $DBT_VENV_PATH && \
. $DBT_VENV_PATH/bin/activate && \
pip install dbt-bigquery==1.5.0
@sungchun12
Copy link
Author

sungchun12 commented May 10, 2023

Include Keyfile JSON contents with the service account key contents

image

@sungchun12
Copy link
Author

Evidence it works
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment