Skip to content

Instantly share code, notes, and snippets.

@gfranxman
Created June 17, 2022 19:24
Show Gist options
  • Save gfranxman/109f3e1df0916c155a6b0ce49c848a6a to your computer and use it in GitHub Desktop.
Save gfranxman/109f3e1df0916c155a6b0ce49c848a6a to your computer and use it in GitHub Desktop.
skip all airflow catchup runs.
def abort_on_catchup(**context):
"""
This function determines whether to continue to the `next_task` or skip to 'end'
using the "next" schedule interval.
"""
# "Catchups" during this window are allowed.
# This is just to cover for late startingjobs.
allowed_execution_start_window = 10 # minutes.
this_dag = context.get("dag")
this_task = context.get("task")
tasks = this_dag.tasks
last_task = tasks[-2] # I don't know why,but we're the last, so -2 is the true last
next_tasks = list(this_task.downstream_task_ids)
current_time = now("UTC")
if context["run_id"].startswith("manual"):
logging.info("manual tasks can always proceed.")
return next_tasks
elif (
current_time.subtract(minutes=allowed_execution_start_window)
< context["next_execution_date"]
):
return next_tasks
else:
logging.info(
f"This appears to be a skippable catchup run,"
f" outside the {allowed_execution_start_window} minute execution window."
)
logging.info(
f"{current_time=} is greater than next_execution_date: {context['next_execution_date']}"
)
logging.info(f"current_time: {current_time}")
logging.info(
f"Next scheduled execution datetime: {context['next_execution_date'].add(days=1)}"
)
logging.error(f"{last_task.task_id=}")
return [last_task.task_id]
branch = BranchPythonOperator(
task_id="skip_catchups", python_callable=abort_on_catchup
)
tasks.append(branch)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment