Skip to content

Instantly share code, notes, and snippets.

@meyer1994
Created March 18, 2023 16:54
Show Gist options
  • Save meyer1994/f6d6c4aefe5580956b4a6c6978483b7c to your computer and use it in GitHub Desktop.
Save meyer1994/f6d6c4aefe5580956b4a6c6978483b7c to your computer and use it in GitHub Desktop.
Dagster sensor that executes for all data partitions
CONFIG_SENSOR = {
'job': jobs.daily,
'monitored_assets': [AssetKey(['b3', 'history', 'daily'])],
}
@multi_asset_sensor(**CONFIG_SENSOR)
def daily(context):
"""
https://discuss.dagster.io/t/2750336/can-an-asset-sensor-trigger-a-run-per-materialized-partition#22ab5516-a460-4ae5-87cb-35c27369f6af
"""
asset = AssetKey(['b3', 'history', 'daily'])
records = context.latest_materialization_records_by_partition(asset)
for partition, record in records.items():
config = {'ops': {'split': {'config': {'key': partition}}}}
yield RunRequest(run_config=config)
context.advance_cursor({asset: record})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment