Skip to content

Instantly share code, notes, and snippets.

@nathairtras
Last active May 28, 2021 15:47
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 to your computer and use it in GitHub Desktop.
Save nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 to your computer and use it in GitHub Desktop.
Callback to clear Airflow SubDag on retry
import logging
from airflow.models import DagBag
def callback_subdag_clear(context):
"""Clears a subdag's tasks on retry."""
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id,
)
execution_date = context['execution_date']
sdag = DagBag().get_dag(dag_id)
logging.info("Clearing SubDag: {} {}".format(dag_id, execution_date))
sdag.clear(
start_date=execution_date,
end_date=execution_date,
only_failed=False,
only_running=False,
confirm_prompt=False,
include_subdags=False)
@alistairmcintyre
Copy link

Thank you for sharing this! This was the missing step I needed to clear a sub dag when one task within it failed.

I had an error though when running logging.info....... AttributeError: 'NoneType' object has no attribute 'info'. May be a versioning conflict.

@maor-sa
Copy link

maor-sa commented Nov 10, 2019

It worked great on Airflow 1.9 but failed on Airflow 1.10+
Any tips?

@nathairtras
Copy link
Author

You may want to check out this fork and give it a try, it grabs the subdag variable differently, possibly for the very issue you encountered.
https://gist.github.com/camilomartinez/84c5a8bb41ad687ef0b32369a030cdc0

I haven't used subdags since 2017. They were giving us more headaches than they were worth in 1.7, but a lot has changed since then. Don't have enough time right now to invest in exploring them again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment