Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save camilomartinez/84c5a8bb41ad687ef0b32369a030cdc0 to your computer and use it in GitHub Desktop.
Save camilomartinez/84c5a8bb41ad687ef0b32369a030cdc0 to your computer and use it in GitHub Desktop.
Callback to clear Airflow SubDag on retry
import logging
from airflow.models import DagBag
def callback_subdag_clear(context):
"""Clears a subdag's tasks on retry."""
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id,
)
execution_date = context['execution_date']
task = context['task']
sdag = task.subdag
if sdag is None:
raise Exception("Can't find dag {}".format(dag_id))
else:
logging.info("Clearing SubDag: {} {}".format(dag_id, execution_date))
sdag.clear(
start_date=execution_date,
end_date=execution_date,
only_failed=False,
only_running=False,
confirm_prompt=False,
include_subdags=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment