Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active March 22, 2024 14:00
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jeremyyeo/429067b3c64fd5854bdfb717b4f5f303 to your computer and use it in GitHub Desktop.
Save jeremyyeo/429067b3c64fd5854bdfb717b4f5f303 to your computer and use it in GitHub Desktop.
dbt Cloud Slim CI using GitHub Actions #dbt

dbt Cloud Slim CI using GitHub Actions

Video: https://www.loom.com/share/020361ffc62e49ffa55deabd819fe166

Reproducing dbt Cloud Slim CI functionality using GitHub Actions - for cases when the dbt Cloud native GitHub App cannot be used or Slim CI isn't working as expected.

The GitHub Action workflow used in the video above is available below in dbt-cloud-actions-fal.yml.

Update (2023-02-27) - there can be scenarios where GitHub Administrators block GitHub Actions that are not from GitHub itself, so I have added a new workflow (dbt-cloud-actions-native.yml) which uses a Python script saved in the repo itself (dbt_cloud.py) instead of using Fal's action as shown in the video above.

Screenshot 2023-02-27 at 8 00 06 PM

  1. Add all the relevant secrets into your repositories secrets section.
  2. Notably for customers who are not using dbt Cloud multi-tenant (i.e. 'https://cloud.getdbt.com') - you will want to customize where GitHub actions sends your API requests to.
  3. Add dbt-cloud-actions-native.yml to your .github/workflows/ folder.
  4. Add dbt_cloud.py to a python/ folder (this can be anywhere really - you should have to point the run step to the right folder where you've put your Python script.
name: Run dbt Cloud jobs
on:
pull_request:
types: [opened, synchronize, closed]
jobs:
run_ci_job:
# Only when PR is opened or new commits added.
if: ${{ github.event_name == 'pull_request' && contains(fromJSON('["opened", "synchronize"]'), github.event.action) }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# Use fal's dbt Cloud GitHub Action.
- uses: fal-ai/dbt-cloud-action@main
id: ci_job
with:
dbt_cloud_token: ${{ secrets.DBT_CLOUD_API_TOKEN }}
dbt_cloud_account_id: ${{ secrets.DBT_CLOUD_ACCOUNT_ID }}
dbt_cloud_job_id: ${{ secrets.DBT_CLOUD_JOB_ID }}
interval: 5
failure_on_error: true
cause: "github_actions_pull_request"
# The commit sha we want to checkout.
# https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#pull_request
git_sha: ${{ github.event.pull_request.head.sha }}
# Follow dbt Cloud Slim CI schema override pattern (e.g. dbt_cloud_pr_123_456).
schema_override: "dbt_cloud_pr_${{ secrets.DBT_CLOUD_JOB_ID }}_${{ github.event.number }}"
steps_override: |
dbt build -s state:modified
drop_pr_schema:
# Only when PR is closed/merged.
if: ${{ github.event_name == 'pull_request' && github.event.action == 'closed' }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: fal-ai/dbt-cloud-action@main
id: drop_ci_schema
with:
dbt_cloud_token: ${{ secrets.DBT_CLOUD_API_TOKEN }}
dbt_cloud_account_id: ${{ secrets.DBT_CLOUD_ACCOUNT_ID }}
dbt_cloud_job_id: ${{ secrets.DBT_CLOUD_JOB_ID }}
interval: 5
failure_on_error: false
cause: "github_actions_pull_request"
git_branch: master # Production branch.
schema_override: "dbt_cloud_pr_${{ secrets.DBT_CLOUD_JOB_ID }}_${{ github.event.number }}"
steps_override: |
dbt run-operation drop_pr_schema
name: Run dbt Cloud jobs
on:
pull_request:
types: [opened, synchronize, closed]
env:
DBT_CLOUD_API_TOKEN: ${{ secrets.DBT_CLOUD_API_TOKEN }}
DBT_CLOUD_ACCOUNT_ID: ${{ secrets.DBT_CLOUD_ACCOUNT_ID }}
DBT_CLOUD_PROJECT_ID: ${{ secrets.DBT_CLOUD_PROJECT_ID }}
DBT_CLOUD_JOB_ID: ${{ secrets.DBT_CLOUD_JOB_ID }}
DBT_CLOUD_URL: "https://cloud.getdbt.com" # Change this to your own dbt Cloud instance.
jobs:
run_ci_job:
if: ${{ github.event_name == 'pull_request' && contains(fromJSON('["opened", "synchronize"]'), github.event.action) }}
runs-on: ubuntu-latest
env:
DBT_CLOUD_JOB_BRANCH: ${{ github.head_ref }}
DBT_CLOUD_JOB_SCHEMA_OVERRIDE: "dbt_cloud_pr_${{ secrets.DBT_CLOUD_JOB_ID }}_${{ github.event.number }}"
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v2
with:
python-version: "3.9"
- name: Run dbt Cloud job
run: "python python/dbt_cloud.py -s 'dbt run -s state:modified'"
drop_pr_schema:
if: ${{ github.event_name == 'pull_request' && github.event.action == 'closed' }}
runs-on: ubuntu-latest
env:
DBT_CLOUD_JOB_SCHEMA_OVERRIDE: "dbt_cloud_pr_${{ secrets.DBT_CLOUD_JOB_ID }}_${{ github.event.number }}"
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v2
with:
python-version: "3.9"
- name: Drop PR schema
run: "python python/dbt_cloud.py -s 'dbt run-operation drop_pr_schema'"
# python/dbt_cloud.py
from urllib.request import Request, urlopen
import argparse
import json
import time
import os
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--steps", type=str, nargs="+")
args = parser.parse_args()
# fmt: off
api_base = os.getenv("DBT_CLOUD_URL", "https://cloud.getdbt.com")
job_cause = os.getenv("DBT_CLOUD_JOB_CAUSE", "github_actions_pull_request")
git_branch = os.getenv("DBT_CLOUD_JOB_BRANCH", None)
schema_override = os.getenv("DBT_CLOUD_JOB_SCHEMA_OVERRIDE", None)
api_token = os.environ["DBT_CLOUD_API_TOKEN"]
account_id = os.environ["DBT_CLOUD_ACCOUNT_ID"]
project_id = os.environ["DBT_CLOUD_PROJECT_ID"]
job_id = os.environ["DBT_CLOUD_JOB_ID"]
job_config = f"""
Request configuration:
api_base: {api_base}
job_cause: {job_cause}
git_branch: {git_branch}
schema_override: {schema_override}
account_id: {account_id}
project_id: {project_id}
job_id: {job_id}
"""
# fmt: on
req_auth_header = {
"Authorization": f"Token {api_token}",
"Content-Type": "application/json",
}
req_job_url = f"{api_base}/api/v2/accounts/{account_id}/jobs/{job_id}/run/"
run_status_map = {
1: "Queued",
2: "Starting",
3: "Running",
10: "Success",
20: "Error",
30: "Cancelled",
}
def run_dbt_cloud_job(
url,
headers,
cause,
branch=None,
schema_override=None,
steps=None,
) -> int:
"""Trigger a dbt Cloud job and returns the job id."""
req_payload = {"cause": cause}
if branch:
req_payload["git_branch"] = branch.replace("refs/heads/", "")
if schema_override:
req_payload["schema_override"] = schema_override.replace("-", "_")
if steps:
req_payload["steps_override"] = steps
print(f"Triggering job:\n url: {url}\n payload: {req_payload}\n")
data = json.dumps(req_payload).encode()
request = Request(method="POST", data=data, headers=headers, url=url)
with urlopen(request) as req:
response = req.read().decode("utf-8")
run_job_resp = json.loads(response)
return run_job_resp["data"]["id"]
def get_run_status(url, headers) -> str:
"""Get the status of a running dbt Cloud job."""
request = Request(headers=headers, url=url)
with urlopen(request) as req:
response = req.read().decode("utf-8")
req_status_resp = json.loads(response)
run_status_code = req_status_resp["data"]["status"]
run_status = run_status_map[run_status_code]
return run_status
def main():
print(job_config)
job_steps = args.steps
run_id: int = None
try:
run_id = run_dbt_cloud_job(
url=req_job_url,
headers=req_auth_header,
cause=job_cause,
branch=git_branch,
schema_override=schema_override,
steps=job_steps,
)
except Exception as e:
print(f"ERROR! - Could not trigger dbt Cloud job:\n{e}")
raise
req_status_url = f"{api_base}/api/v2/accounts/{account_id}/runs/{run_id}/"
run_status_link = (
f"{api_base}/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/"
)
time.sleep(30)
while True:
run_status = get_run_status(req_status_url, req_auth_header)
print(f"Run status -> {run_status}")
if run_status in ["Error", "Cancelled"]:
raise Exception(f"Run failed or cancelled. See why at {run_status_link}")
if run_status == "Success":
print(f"Job completed successfully! See details at {run_status_link}")
return
time.sleep(10)
if __name__ == "__main__":
main()
-- macros/drop_pr_schema.yml
{% macro drop_pr_schema() %}
{% set target_schema = target.schema | lower %}
{% if execute %}
{#/* Explicit check for the PR schema pattern - error if if's something other than 'dbt_cloud_pr_X_Y' */#}
{% if not target_schema.startswith("dbt_cloud_pr") %}
{% do exceptions.raise_compiler_error("You may be dropping a non PR schema " ~ target.schema) %}
{% endif %}
{% set schema_to_drop = target.database ~ '.' ~ target_schema %}
{% do log('Attempting to drop PR schema ' ~ schema_to_drop ~ '.', info=True) %}
{% set query %}
drop schema if exists {{ schema_to_drop }};
{% endset %}
{% do run_query(query) %}
{% do log('PR schema ' ~ schema_to_drop ~ ' dropped.', info=True) %}
{% endif %}
{% endmacro %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment