Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active May 13, 2024 20:59
Show Gist options
  • Save slopp/bd1e24e0ac75c7f8d78bd134ddc1768d to your computer and use it in GitHub Desktop.
Save slopp/bd1e24e0ac75c7f8d78bd134ddc1768d to your computer and use it in GitHub Desktop.
Example of different automation policies

This example shows how to create an asset graph where the root assets are run by various schedules, and the downstreams are run using varying auto-materialization policies.

Screen Shot 2024-05-13 at 2 57 37 PM

To install:

pip install dagster dagster-webserver

To run:

dagster dev -f definitions.py

To get started, click on "Overview" and then "Auto-Materialize". Toggle the status to running.

Next, click on "Asset" and "View Global Asset Lineage". Select the root asset nodes and launch a backfill for all partitions by clicking on "Materialize".

Shortly after the backfill completes you will see the downstream staged assets be executed automatically based on their auto-materialization policies.

Screen Shot 2024-05-13 at 2 57 54 PM

You can play around with launching individual parent assets and seeing which downstreams are updated. If you select a downstream asset, the sidebar will open and describe the rules present for that asset. If you click 'view automation history' you can see the specific details of every system evaluation.

Screen Shot 2024-05-13 at 2 58 13 PM

You can also click on "Overview" and "Schedules" to enable any of the root asset automations (though you may want to adjust the schedule definitions in the code to avoid waiting days!).

from dagster import asset, AssetExecutionContext, DailyPartitionsDefinition, Definitions, define_asset_job, AssetSelection, AssetKey, ScheduleDefinition, AutoMaterializePolicy, AutoMaterializeRule
# a partition definition is optional but allows us to "parameterize"
# the pipelines by date for doing incremental runs & backfills
daily_partitions = DailyPartitionsDefinition(start_date="2024-5-01")
# you can create assets in-line
@asset(
group_name="SAP",
key_prefix="SAP",
partitions_def=daily_partitions
)
def a_table(context: AssetExecutionContext):
context.log.info("Some information logged during execution")
# do the work to update the SAP table for this partition
context.partition_key
context.add_output_metadata({
"some_key_for_catalog": "some_execution_time_value"
})
return
@asset(
group_name="SAP",
key_prefix="SAP",
partitions_def=daily_partitions
)
def b_table(context: AssetExecutionContext):
context.log.info("Some information logged during execution")
# do the work to update the SAP table for this partition
context.partition_key
context.add_output_metadata({
"some_key_for_catalog": "some_execution_time_value"
})
return
# or create assets more dynamnically using a factory approach
def create_alegro_producers(tables_to_produce):
assets_to_create = []
for table in tables_to_produce:
@asset(
group_name="alegro",
key_prefix="alegro",
partitions_def=daily_partitions,
name=table
)
def alegro_producer(context: AssetExecutionContext):
context.log.info(f"Some information logged during execution of {table}")
# ...
return
assets_to_create.append(alegro_producer)
return assets_to_create
alegro_assets = create_alegro_producers(["c_table", "c_prime_table"])
# create jobs & schedules for either a group of producers or individual producers
# sensors could also be used if you want jobs to run after external events instead
# of on cron
sap_extract_job = define_asset_job(
name="sap_extract_job",
selection=AssetSelection.groups("SAP")
)
sap_extract_daily = ScheduleDefinition(
name="sap_extract_daily",
job=sap_extract_job,
cron_schedule="0 8 * * *"
)
alegro_extract_job_c = define_asset_job(
name="alegro_extract_job_c",
selection=AssetSelection.assets(AssetKey(["alegro", "c_table"]))
)
alegro_extract_daily = ScheduleDefinition(
name="alegro_extract_daily",
job=alegro_extract_job_c,
cron_schedule="30 9 * * *",
)
alegro_extract_job_c_prime = define_asset_job(
name="alegro_extract_job_c_prime",
selection=AssetSelection.assets(AssetKey(["alegro", "c_prime_table"]))
)
alegro_extract_hourly = ScheduleDefinition(
name="alegro_extract_hourly",
job=alegro_extract_job_c_prime,
cron_schedule="0 * * * *",
)
# create consumers, typically the granularity remains 1 asset per table
@asset(
group_name="staged",
key_prefix="staged",
# specify the upstreams
deps=[AssetKey(["SAP", "a_table"]), AssetKey(["SAP", "b_table"]), AssetKey(["alegro", "c_prime_table"])],
# and the scheduling condition, this one propagates changes of any upstream ASAP
auto_materialize_policy=AutoMaterializePolicy.eager()
)
def consumer_d(context: AssetExecutionContext):
#...
return
@asset(
group_name="staged",
key_prefix="staged",
# specify the upstreams
deps=[AssetKey(["SAP", "a_table"]), AssetKey(["alegro", "c_table"])],
# and the scheduling condition, this one propagates changes but with some different constraints
auto_materialize_policy=AutoMaterializePolicy.eager().with_rules(
AutoMaterializeRule.skip_on_not_all_parents_updated()
)
)
def consumer_f(context: AssetExecutionContext):
#...
return
defs = Definitions(
assets=[a_table, b_table, *alegro_assets, consumer_d, consumer_f, consumer_g],
jobs=[sap_extract_job, alegro_extract_job_c, alegro_extract_job_c_prime],
schedules=[sap_extract_daily, alegro_extract_daily, alegro_extract_hourly],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment