Skip to content

Instantly share code, notes, and snippets.

@ftfarias
Created January 23, 2019 18:18
Show Gist options
  • Save ftfarias/af7f0d92218472e280eb8da0d946162a to your computer and use it in GitHub Desktop.
Save ftfarias/af7f0d92218472e280eb8da0d946162a to your computer and use it in GitHub Desktop.
How to clean airflow dags
import sqlalchemy
import sys
def print_clear_dag(dag_id):
"""Clear all information for a DAG from airflow postgres database"""
list_tables = ['xcom', 'task_instance', 'sla_miss', 'log', 'job', 'dag_run', 'dag',
'dag_stats', 'task_fail']
for table in list_tables:
queries = ["DELETE FROM {} WHERE dag_id='{}'".format(table, dag_id),
"DELETE FROM {} WHERE dag_id LIKE '{}'".format(table, dag_id)]
for query in queries:
print("{};".format(query))
def clear_dag(dag_id):
"""Clear all information for a DAG from airflow postgres database"""
list_tables = ['xcom', 'task_instance', 'sla_miss', 'log', 'job', 'dag_run', 'dag',
'dag_stats', 'task_fail']
uri = 'mysql+mysqldb:// /airflow'
engine = sqlalchemy.create_engine(uri)
with engine.connect() as conn:
for table in list_tables:
queries = ["DELETE FROM {} WHERE dag_id='{}'".format(table, dag_id),
"DELETE FROM {} WHERE dag_id LIKE '{}'".format(table, dag_id)]
for query in queries:
print("\t: running {}".format(query))
conn.execute(query)
print("Deleting {}".format(sys.argv[1]))
print_clear_dag(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment