Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active August 10, 2023 17:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slopp/62b268322771fad6f18d83587718d4e7 to your computer and use it in GitHub Desktop.
Save slopp/62b268322771fad6f18d83587718d4e7 to your computer and use it in GitHub Desktop.
Example of Observable Source Assets
# run with dagster-dev -f observable_source_assets1.py, then enable the auto materialization daemon
# the downstreams will update as the live data is observed
from dagster import observable_source_asset, asset, AutoMaterializePolicy, DataVersion
from datetime import datetime
def today():
return datetime.today().date()
def now():
return datetime.now()
@observable_source_asset(
auto_observe_interval_minutes=24*60
)
def daily_data():
return DataVersion(str(today()))
@observable_source_asset(
auto_observe_interval_minutes=2
)
def live_data():
return DataVersion(str(now()))
@asset(
deps=[daily_data, live_data],
auto_materialize_policy=AutoMaterializePolicy.eager()
)
def downstream():
return
@asset(
deps=[downstream],
auto_materialize_policy=AutoMaterializePolicy.eager()
)
def downstream_downstream():
return
# run with dagster-dev -f observable_source_assets2.py, then enable the auto materialization daemon.
# Materialize the asset "downstream" manually
# if you have an eager policy on downstream_downstream it will update immediately
# if you have a lazy policty on downstream_downstream it won't run, but it _also_ won't be marked overdue
from dagster import observable_source_asset, asset, AutoMaterializePolicy, DataVersion, FreshnessPolicy
from datetime import datetime
def today():
return datetime.today().date()
def now():
return datetime.now()
@observable_source_asset(
auto_observe_interval_minutes=24*60
)
def daily_data():
return DataVersion(str(today()))
@asset(
deps=[daily_data],
auto_materialize_policy=AutoMaterializePolicy.eager()
)
def downstream():
return
@asset(
deps=[downstream],
auto_materialize_policy=AutoMaterializePolicy.lazy(),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=1)
)
def downstream_downstream():
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment