Skip to content

Instantly share code, notes, and snippets.

@saroj22322
Last active February 12, 2024 15:32
Show Gist options
  • Save saroj22322/b439f274705b485129287dea9f8a8537 to your computer and use it in GitHub Desktop.
Save saroj22322/b439f274705b485129287dea9f8a8537 to your computer and use it in GitHub Desktop.
Airflow DB Cleanup DAG using 'airflow db clean' utils
# MIT License
# Copyright (c) 2024 Saroj Tripathi
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
doc_md = f""" A maintenance DAG to clean up Airflow Metastore. This will automatically clean up the entries before \n
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS days in tables listed in TABLES_TO_DELETE. By default, it runs daily (Adjust: SCHEDULE_INTERVAL). \n
Use VERBOSE and DRYRUN for testing purposes.
"""
import airflow
from airflow.models import DAG, Variable
import os
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = airflow.utils.dates.days_ago(1)
SCHEDULE_INTERVAL = "@daily"
DAG_OWNER_NAME = "operations"
ALERT_EMAIL_ADDRESSES = []
# Days in integer, before which the entries are cleaned up.
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
Variable.get("airflow_db_cleanup_age_in_days", 1)
)
# Verbose mode. Turn off to avoid unwanted log information.
VERBOSE = False
# Enabling dry-run will not delete the entries but only show number of affected entries.
DRYRUN = False
# List of tables to clean
TABLES_TO_DELETE = ['job','dag_run','task_instance','log','xcom','sla_miss','task_reschedule','task_fail','celery_taskmeta','dataset_event']
default_args = {
'owner': DAG_OWNER_NAME,
'depends_on_past': False,
'email': ALERT_EMAIL_ADDRESSES,
'email_on_failure': True,
'email_on_retry': False,
'start_date': START_DATE,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
DAG_ID,
default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
tags=['internal', 'airflow', 'cleanup', 'db']
)
if hasattr(dag, 'doc_md'):
dag.doc_md = doc_md
if hasattr(dag, 'catchup'):
dag.catchup = False
clean_before_date = Variable.get('AIRFLOW_CTX_EXECUTION_DATE', datetime.utcnow()) + timedelta(-DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS)
test_clean_up = BashOperator(
task_id="cleanup_db",
bash_command="airflow db clean --clean-before-timestamp '{{ params.DATE }}' {{ params.VERBOSE }} {{ params.DRYRUN }} -y --skip-archive -t {{ params.TABLES }}",
params = {
'DATE' : clean_before_date,
'VERBOSE' : '-v' if VERBOSE else '',
'DRYRUN' : '--dry-run' if DRYRUN else '',
'TABLES' : ','.join(TABLES_TO_DELETE)
},
dag=dag,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment