Skip to content

Instantly share code, notes, and snippets.

@cicdw
Last active May 15, 2020 05:13
Show Gist options
  • Save cicdw/30f9c71642d5158bee3cf0031b3686e1 to your computer and use it in GitHub Desktop.
Save cicdw/30f9c71642d5158bee3cf0031b3686e1 to your computer and use it in GitHub Desktop.
Custom Prefect Trigger
def aggregation_skipped_trigger(upstream_states: Dict[Edge, State]) -> bool:
"""
Custom trigger which trigger fails only if the aggregation task
specifically was skipped.
"""
# upstream_states is a dictionary of _all_ upstream dependencies
# but this task will only care about the aggregation task specifically
agg_state = [
state
for edge, state in upstream_states.items()
if edge.upstream_task.name == "aggregation"
]
if agg_state and agg_state[0].is_skipped():
raise TRIGGERFAIL("Aggregation Task skipped, cannot proceed.")
elif not all(s.is_successful() for s in upstream_states.values()):
raise TRIGGERFAIL("Some upstream tasks did not succeed, cannot proceed.")
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment