Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Last active June 12, 2024 01:39
Show Gist options
  • Save cnolanminich/9b2c20514ac8ce3e01737892bba2a7e5 to your computer and use it in GitHub Desktop.
Save cnolanminich/9b2c20514ac8ce3e01737892bba2a7e5 to your computer and use it in GitHub Desktop.
Execute a Snowflake job based on whether a table has been updated

Step 1: Define the Snowflake Resource

from dagster import (
    Definitions,
    AssetKey,
    RunRequest,
    SensorEvaluationContext,
    AssetExecutionContext,
    asset,
    define_asset_job,
    sensor,
    SkipReason,
    AssetSelection,
    AssetSpec,
    MetadataValue,
    ObserveResult,
    multi_observable_source_asset,
    EventRecordsFilter,
    DagsterEventType,
    RunRequest,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    AssetMaterialization,
    ScheduleDefinition,
)
from datetime import datetime, timedelta, timezone

from dagster_snowflake import SnowflakeResource

snowflake_resource = SnowflakeResource(
    account="your_account",
    user="your_user",
    password="your_password",
    database="your_database",
    schema="your_schema",
    warehouse="your_warehouse",
)

Step 2: Create an Observable Source Asset

TABLE_SCHEMA = "PUBLIC"
table_name = "my_table"
asset_spec = AssetSpec(table_name)

@multi_observable_source_asset(specs=[asset_spec], )
def my_table(duckdb_resource: DuckDBResource):
    with duckdb_resource.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute(f"SELECT MAX(loaded_at) FROM {table_name}")
        last_updated = cursor.fetchone()[0]
        yield ObserveResult(
            asset_key=table_name,
            metadata={
                "dagster/last_updated_timestamp": MetadataValue.timestamp(last_updated)
            },
        )

# define an asset
@asset(deps=[AssetKey(table_name)])
def downstream_table():
    return 1

downstream_job = define_asset_job("downstream_job", selection=AssetSelection.assets(downstream_table))


# Define the Observation Job
observation_job = define_asset_job(
    "observation_job",
    selection=AssetSelection.assets(my_table)
)

# Create a Schedule
observation_schedule = ScheduleDefinition(
    name="hourly_observation_schedule",
    cron_schedule="0 * * * *",  # Every hour
    job=observation_job,
)

Step 3: Define the Sensor

@sensor(jobs=[downstream_job])
def my_asset_observation_sensor(context):
    asset_key = AssetKey(table_name)
    instance = context.instance
    
    # Fetch the latest observation event for the asset
    records = instance.get_event_records(
        event_records_filter=EventRecordsFilter(
            event_type=DagsterEventType.ASSET_OBSERVATION,
            asset_key=asset_key,
            after_cursor=int(context.cursor) if context.cursor else None,
        ),
        limit=1,
    )
    
    if not records:
        return
    latest_record = records[0]
    #latest_observation = latest_record.dagster_event.event_specific_data.materialization
    latest_observation = latest_record.asset_observation
    #timestamp_str = latest_observation.metadata["dagster/last_updated_timestamp"]
    # Extract the last_updated timestamp from the metadata
    last_updated_metadata = latest_observation.metadata.get("dagster/last_updated_timestamp")
    if not last_updated_metadata or not isinstance(last_updated_metadata, MetadataValue):
        return
    
    last_updated_timestamp = last_updated_metadata.value
    last_updated_timestamp = datetime.fromtimestamp(last_updated_timestamp, tz=timezone.utc)
    # Convert the cursor to a datetime object
    cursor_timestamp = datetime.fromtimestamp(int(context.cursor), tz=timezone.utc) if context.cursor else None
    
    # Compare the last_updated timestamp with the cursor
    if cursor_timestamp is None or last_updated_timestamp > cursor_timestamp:
        # Update the cursor to the latest observation's timestamp
        context.update_cursor(str(latest_record.storage_id))
        
        # Trigger a run
        yield RunRequest(
        run_key=f"run_for_{asset_key.to_user_string()}_{last_updated_timestamp}",
        run_config={
            "resources": {
                "snowflake": {
                    "config": {
                        "account": "your_account",
                        "user": "your_user",
                        "password": "your_password",
                        "database": "your_database",
                        "schema": "your_schema",
                        "warehouse": "your_warehouse",
                    }
                }
            }
        },)

Set up the downstream jopb

from dagster import define_asset_job

downstream_job = define_asset_job("downstream_job", selection=AssetSelection.assets(source_table))

Step 5: Combine Everything in Definitions

from dagster import Definitions

defs = Definitions(
    assets=[downstream_table, my_table],
    jobs=[downstream_job, observation_job],
    sensors=[my_asset_observation_sensor],
    resources={"snowflake": snowflake_resource},
    schedules=[observation_schedule],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment