Skip to content

Instantly share code, notes, and snippets.

@slopp
slopp / assets.py
Created June 27, 2024 18:04
Custom dagster asset decorator
from dagster import asset
# add an attribute to all assets using this decorator without users having to adjust it
def bi_team_asset(**asset_decorator_kwargs):
def _wrapper(f):
@asset(**asset_decorator_kwargs, owners=["bi@corp.org"], name=f.__name__)
def _impl(**kwargs):
return f(**kwargs)
return _impl
@slopp
slopp / config.json
Created June 27, 2024 17:27
Dagster asset factory example
[
{
"dag_id": "powerbi1",
"python_script": "some_script.py",
"dataset_id": "fdsjl4539fdjsk"
},
{
"dag_id": "powerbi2",
"python_script": "some_other_script.py",
"dataset_id": "89fdskfds0"
@slopp
slopp / ReadMe.md
Last active May 13, 2024 20:59
Example of different automation policies

This example shows how to create an asset graph where the root assets are run by various schedules, and the downstreams are run using varying auto-materialization policies.

Screen Shot 2024-05-13 at 2 57 37 PM

To install:

pip install dagster dagster-webserver
@slopp
slopp / README.md
Created February 27, 2024 16:14
SQL Server to GCS to BQ Dagster Pipeline Example

This example shows a skeleton for how to build a Dagster project that extracts tables from SQL Server, stores the extract as a CSV in GCS, and then uploads the GCS extract to BigQuery.

The actual extract and load logic is omitted. But the purpose of this project is to show how such a pipeline can be represented in Dagster assets.

First, a single pipeline for one table is created. This is demonstrated in the file dagster_mock_one_table.py. To run this example:

  1. Create a Python virtual environment and then run:
pip install dagster dagster-webserver
@slopp
slopp / fn_profile.yaml
Created January 23, 2024 16:01
Dagster with a custom DSL
profile:
name: FN
stocks_to_index:
- ticker: NFLX
- ticker: META
index_strategy:
type: equal
forecast:
days: 60
@slopp
slopp / credits.sql
Created September 28, 2023 21:01
Credits Dagster OSS
with events as (select distinct
DATE_FORMAT(timestamp, '%Y-%m') as event_month,
dagster_event_type,
coalesce(run_id, '||', step_key) as step_id,
count(1) as credits
from event_logs
where dagster_event_type = 'STEP_START'
@slopp
slopp / observable_source_assets1.py
Last active August 10, 2023 17:17
Example of Observable Source Assets
# run with dagster-dev -f observable_source_assets1.py, then enable the auto materialization daemon
# the downstreams will update as the live data is observed
from dagster import observable_source_asset, asset, AutoMaterializePolicy, DataVersion
from datetime import datetime
def today():
return datetime.today().date()
def now():
@slopp
slopp / example_of_observable_assets.py
Last active August 9, 2023 16:17
example_of_observable_assets.py
@observable_source_asset(
auto_observe_interval_minutes=24*60
)
def daily_data():
return DataVersion(str(today()))
@observable_source_asset(
auto_observe_interval_minutes=10
)
def live_data():
@slopp
slopp / README.md
Last active March 23, 2024 01:51
Dynamic pipeline that invokes k8 ops

Dynamic Pipeline

This example shows the psuedo-code for a Dagster pipeline that:

  1. Accepts the path to a raw dataset as a string
  2. Runs a step to break the raw dataset into partitions
  3. For each partition, the pipeline runs a series of two processing steps. Each processing step is a call out to a Docker container to run supplying the partition key as an input argument. The partitions are run together in parallel before being collected in a final processing step that operates on all the partitions.

To run the pipeline:

@slopp
slopp / README.md
Created July 12, 2023 16:33
Ops and Jobs Example

Ops and Jobs Example

This example shows how to use ops to create a graph of tasks executed as a Dagster job with specific configuration and resources. The example also shows how to use a custom schedule.

To start:

pip install dagster dagit
dagster dev -f ops_example.py