Skip to content

Instantly share code, notes, and snippets.

@slopp
slopp / README.md
Created February 27, 2024 16:14
SQL Server to GCS to BQ Dagster Pipeline Example

This example shows a skeleton for how to build a Dagster project that extracts tables from SQL Server, stores the extract as a CSV in GCS, and then uploads the GCS extract to BigQuery.

The actual extract and load logic is omitted. But the purpose of this project is to show how such a pipeline can be represented in Dagster assets.

First, a single pipeline for one table is created. This is demonstrated in the file dagster_mock_one_table.py. To run this example:

  1. Create a Python virtual environment and then run:
pip install dagster dagster-webserver
@slopp
slopp / fn_profile.yaml
Created January 23, 2024 16:01
Dagster with a custom DSL
profile:
name: FN
stocks_to_index:
- ticker: NFLX
- ticker: META
index_strategy:
type: equal
forecast:
days: 60
@slopp
slopp / credits.sql
Created September 28, 2023 21:01
Credits Dagster OSS
with events as (select distinct
DATE_FORMAT(timestamp, '%Y-%m') as event_month,
dagster_event_type,
coalesce(run_id, '||', step_key) as step_id,
count(1) as credits
from event_logs
where dagster_event_type = 'STEP_START'
@slopp
slopp / observable_source_assets1.py
Last active August 10, 2023 17:17
Example of Observable Source Assets
# run with dagster-dev -f observable_source_assets1.py, then enable the auto materialization daemon
# the downstreams will update as the live data is observed
from dagster import observable_source_asset, asset, AutoMaterializePolicy, DataVersion
from datetime import datetime
def today():
return datetime.today().date()
def now():
@slopp
slopp / example_of_observable_assets.py
Last active August 9, 2023 16:17
example_of_observable_assets.py
@observable_source_asset(
auto_observe_interval_minutes=24*60
)
def daily_data():
return DataVersion(str(today()))
@observable_source_asset(
auto_observe_interval_minutes=10
)
def live_data():
@slopp
slopp / README.md
Last active March 23, 2024 01:51
Dynamic pipeline that invokes k8 ops

Dynamic Pipeline

This example shows the psuedo-code for a Dagster pipeline that:

  1. Accepts the path to a raw dataset as a string
  2. Runs a step to break the raw dataset into partitions
  3. For each partition, the pipeline runs a series of two processing steps. Each processing step is a call out to a Docker container to run supplying the partition key as an input argument. The partitions are run together in parallel before being collected in a final processing step that operates on all the partitions.

To run the pipeline:

@slopp
slopp / README.md
Created July 12, 2023 16:33
Ops and Jobs Example

Ops and Jobs Example

This example shows how to use ops to create a graph of tasks executed as a Dagster job with specific configuration and resources. The example also shows how to use a custom schedule.

To start:

pip install dagster dagit
dagster dev -f ops_example.py
@slopp
slopp / 3d_multi_partitions.py
Last active July 3, 2023 21:22
Alternative to Multi-multi Partitions
from dagster import DailyPartitionsDefinition
from dagster import asset, OpExecutionContext, Definitions, AssetKey
from itertools import permutations
# Root Nodes
date = DailyPartitionsDefinition(start_date="2023-06-01")
colors = ["blue", "red"]
@slopp
slopp / users_by_role.py
Created July 3, 2023 17:25
Get Users by Role
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
import os
import pandas as pd
from datetime import datetime, timedelta
USER_GRANTS_QUERY = """
query UsersByRole {
usersOrError {
@slopp
slopp / ReadMe.md
Created May 5, 2023 16:39
Dagster Cloud External Compute Logs

In release 1.3.3 Dagster introduced the ability for Dagster to display a link to compute logs instead of displaying the logs directly. This capability is important for Dagster Cloud users who do not want to send compute logs to Dagster Cloud, but still want end users to be able to access logs while debugging a run.

This capability is possible because of additions to the compute log manager. Users can implement their own compute log manager for full control over the link behavior or use the default dagster-aws implementation. The default implementation stores logs in s3 and displays a link to the log file:

Screen Shot 2023-05-04 at 10 53 50 AM

*Note in 1.3.3 the displayed link is directly to the s3 object. In 1.3.4 the displayed link is to the s3 console for the log object which provides a better experience for non-public s3