Skip to content

Instantly share code, notes, and snippets.

@C0DK
Created January 2, 2023 19:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save C0DK/deba711251478c99ab1de95e4db6e393 to your computer and use it in GitHub Desktop.
Save C0DK/deba711251478c99ab1de95e4db6e393 to your computer and use it in GitHub Desktop.
@multi_asset_sensor(
name="post_ingress_sensor",
asset_keys=all_keys,
job=post_ingress_job,
default_status=DefaultSensorStatus.RUNNING,
)
def sensor(context: MultiAssetSensorEvaluationContext):
logger = get_dagster_logger()
for (
asset_key,
materialization,
) in context.latest_materialization_records_by_key().items():
if materialization is None:
continue
asset = context.assets_defs_by_key[asset_key]
assert asset is not None
metadata = asset.metadata_by_key.get(asset_key)
if metadata["specific_key"] != "expected_key":
logger.info(f"Skipping {asset_key=} as it isn't in Foo")
continue
base_folder = metadata["database"]
schema_name = metadata["schema"]
table_name = metadata["table"]
logger.info(f"Loading {asset_key=} into raw based on metadata")
yield RunRequest(
run_key=f"{asset_key}_{materialization.event_log_entry.run_id}",
run_config={
"base_folder": base_folder,
"schema_name": schema_name,
"table_name": schema_name,
})
context.advance_all_cursors()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment