Skip to content

Instantly share code, notes, and snippets.

@ColeMurray
Created June 25, 2024 06:40
Show Gist options
  • Save ColeMurray/a479381094a7eb9baa4a0ac8555bad63 to your computer and use it in GitHub Desktop.
Save ColeMurray/a479381094a7eb9baa4a0ac8555bad63 to your computer and use it in GitHub Desktop.
Azure Snowflake DBT

Complete Tutorial: Azure, Snowflake, DBT, and Python with Medallion Architecture

Table of Contents

  1. Introduction
  2. Architecture Overview
  3. Setting Up the Environment 3.1. Azure Setup 3.2. Snowflake Setup 3.3. DBT Setup 3.4. Python Environment Setup
  4. Data Ingestion with Azure Data Factory
  5. Medallion Architecture Implementation 5.1. Bronze Layer: Raw Data in Snowflake 5.2. Silver Layer: Cleansed and Conformed Data 5.3. Gold Layer: Business-Level Aggregates
  6. DBT Models and Transformations
  7. Python Scripts for Orchestration and Additional Processing
  8. Scheduling and Monitoring
  9. Testing and Quality Assurance
  10. Performance Optimization
  11. Security and Compliance
  12. Best Practices and Tips
  13. Conclusion

1. Introduction

This tutorial will guide you through building a robust data pipeline using Azure, Snowflake, DBT, and Python, following the medallion architecture. This setup is designed for teams looking to implement a scalable, maintainable, and efficient data solution.

2. Architecture Overview

Our architecture consists of:

  • Azure Data Factory for data ingestion and orchestration
  • Azure Blob Storage for storing raw data
  • Snowflake as our cloud data warehouse
  • DBT for data transformation and modeling
  • Python for custom processing and orchestration

We'll follow the medallion architecture:

  • Bronze: Raw data ingested from source systems (created by Azure Data Factory)
  • Silver: Cleansed and conformed data (transformed by DBT)
  • Gold: Business-level aggregates (created by DBT)

3. Setting Up the Environment

3.1. Azure Setup

  1. Create an Azure account and set up a resource group.
  2. Set up Azure Data Factory:
    az datafactory create --name "YourDataFactory" --resource-group "YourResourceGroup" --location "EastUS"
  3. Create an Azure Blob Storage account:
    az storage account create --name "YourStorageAccount" --resource-group "YourResourceGroup" --location "EastUS" --sku Standard_LRS

3.2. Snowflake Setup

  1. Sign up for a Snowflake account.
  2. Create a Snowflake warehouse, database, and schema:
    CREATE WAREHOUSE IF NOT EXISTS compute_wh WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 60 AUTO_RESUME = TRUE;
    CREATE DATABASE IF NOT EXISTS your_database;
    USE DATABASE your_database;
    CREATE SCHEMA IF NOT EXISTS your_schema;

3.3. DBT Setup

  1. Install DBT with Snowflake adapter:
    pip install dbt-snowflake
    
  2. Initialize a DBT project:
    dbt init your_project_name
    
  3. Configure profiles.yml for Snowflake connection:
    your_project_name:
      target: dev
      outputs:
        dev:
          type: snowflake
          account: your_account.snowflakecomputing.com
          user: your_username
          password: your_password
          role: your_role
          database: your_database
          warehouse: compute_wh
          schema: your_schema
          threads: 4

3.4. Python Environment Setup

  1. Set up a virtual environment:
    python -m venv data_pipeline_env
    source data_pipeline_env/bin/activate  # On Windows: data_pipeline_env\Scripts\activate
    
  2. Install required packages:
    pip install azure-storage-blob azure-identity snowflake-connector-python pandas
    

4. Data Ingestion with Azure Data Factory

  1. In Azure Data Factory, create a pipeline for data ingestion.
  2. Set up a copy activity to move data from your source to Azure Blob Storage.
  3. Use Azure Data Factory's Snowflake connector to copy data from Blob Storage to Snowflake, creating the bronze layer tables.

Example Azure Data Factory pipeline (JSON snippet):

{
    "name": "IngestDataToSnowflake",
    "properties": {
        "activities": [
            {
                "name": "CopyFromSourceToBlob",
                "type": "Copy",
                "inputs": [{"referenceName": "SourceDataset", "type": "DatasetReference"}],
                "outputs": [{"referenceName": "AzureBlobDataset", "type": "DatasetReference"}]
            },
            {
                "name": "CopyFromBlobToSnowflake",
                "type": "Copy",
                "inputs": [{"referenceName": "AzureBlobDataset", "type": "DatasetReference"}],
                "outputs": [{"referenceName": "SnowflakeBronzeDataset", "type": "DatasetReference"}],
                "typeProperties": {
                    "source": {
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "SnowflakeSink",
                        "tableCreation": "Auto"
                    }
                }
            }
        ]
    }
}

In this setup, ADF will automatically create the bronze tables in Snowflake based on the source data structure.

5. Medallion Architecture Implementation

5.1. Bronze Layer: Raw Data in Snowflake

Since Azure Data Factory is creating the bronze tables, we'll use DBT to document and test these tables.

Create a file models/bronze/bronze_tables.yml:

version: 2

sources:
  - name: bronze
    database: your_database
    schema: your_schema
    tables:
      - name: raw_data
        columns:
          - name: id
            tests:
              - unique
              - not_null
          - name: name
          - name: created_at

models:
  - name: raw_data
    description: "Bronze layer table created by Azure Data Factory"
    columns:
      - name: id
        description: "Unique identifier for each record"
      - name: name
        description: "Name field from the source data"
      - name: created_at
        description: "Timestamp of when the record was created"

This YAML file documents the structure of the bronze table and sets up some basic tests.

5.2. Silver Layer: Cleansed and Conformed Data

Create a model models/silver/cleaned_data.sql:

{{ config(materialized='table') }}

SELECT
    id,
    TRIM(LOWER(name)) AS name,
    created_at::DATE AS created_date
FROM {{ source('bronze', 'raw_data') }}
WHERE id IS NOT NULL

5.3. Gold Layer: Business-Level Aggregates

Create another DBT model models/gold/user_stats.sql:

{{ config(materialized='table') }}

SELECT
    created_date,
    COUNT(DISTINCT id) AS new_users,
    COUNT(*) AS total_records
FROM {{ ref('cleaned_data') }}
GROUP BY created_date

6. DBT Models and Transformations

Organize your DBT project:

models/
├── bronze/
│   └── bronze_tables.yml
├── silver/
│   └── cleaned_data.sql
└── gold/
    └── user_stats.sql

Run your DBT models:

dbt run

7. Python Scripts for Orchestration and Additional Processing

Create a Python script for orchestration orchestrate.py:

import subprocess
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient

def run_dbt():
    result = subprocess.run(['dbt', 'run'], capture_output=True, text=True)
    return result.returncode == 0

def upload_logs_to_azure(container_name, log_content):
    credential = DefaultAzureCredential()
    blob_service_client = BlobServiceClient(account_url="https://yourstorageaccount.blob.core.windows.net", credential=credential)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="dbt_run.log")
    blob_client.upload_blob(log_content, overwrite=True)

if __name__ == "__main__":
    success = run_dbt()
    log_content = "DBT run completed successfully" if success else "DBT run failed"
    upload_logs_to_azure('logs', log_content)

8. Scheduling and Monitoring

  1. Use Azure Data Factory to schedule your Python script and DBT runs.
  2. Set up Azure Monitor to track your pipeline's performance and set alerts.

9. Testing and Quality Assurance

  1. Implement DBT tests. In models/silver/cleaned_data.yml:
version: 2

models:
  - name: cleaned_data
    columns:
      - name: id
        tests:
          - unique
          - not_null
      - name: name
        tests:
          - not_null
  1. Run DBT tests:
dbt test

10. Performance Optimization

  1. Use Snowflake's CLUSTER BY for frequently filtered columns:
{{ config(
    materialized='table',
    cluster_by=['created_date']
) }}

-- Your SQL model here
  1. Optimize Snowflake warehouse size and auto-scaling settings.

11. Security and Compliance

  1. Use Azure Key Vault to store sensitive information.
  2. Implement column-level security in Snowflake for sensitive data:
CREATE OR REPLACE SECURITY POLICY row_access_policy AS (id_column int) RETURNS BOOLEAN ->
    CURRENT_ROLE() IN ('ANALYST_ROLE', 'ADMIN_ROLE');

ALTER TABLE gold.user_stats ADD ROW ACCESS POLICY row_access_policy ON (id);

12. Best Practices and Tips

  1. Use DBT documentation to keep track of your data models.
  2. Implement CI/CD for your DBT project using Azure DevOps.
  3. Regularly review and optimize your SQL transformations and Snowflake resource usage.
  4. Use Snowflake's Time Travel for data recovery and auditing.
  5. Leverage Snowflake's zero-copy cloning for testing environments.

13. Conclusion

This comprehensive tutorial has guided you through setting up a robust data pipeline using Azure, Snowflake, DBT, and Python with a medallion architecture. Key points to remember:

  1. Azure Data Factory handles data ingestion and creates bronze tables.
  2. DBT is used for transforming data in silver and gold layers, as well as documenting and testing bronze tables.
  3. Python scripts can be used for additional orchestration and processing.
  4. Regular testing, monitoring, and optimization are crucial for maintaining a healthy data pipeline.

By following these steps, you've created a scalable, maintainable, and efficient data transformation process that separates raw data ingestion, data cleaning, and business-level aggregations. Remember to continually refine your models, optimize performance, and maintain security as your data needs evolve.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment