Skip to content

Instantly share code, notes, and snippets.

@revolutionisme
Created June 24, 2021 15:49
Show Gist options
  • Save revolutionisme/8246c6e2fa7d8abb7f6c393cb5218595 to your computer and use it in GitHub Desktop.
Save revolutionisme/8246c6e2fa7d8abb7f6c393cb5218595 to your computer and use it in GitHub Desktop.
Query the Airflow metadata db from within a python operator
def set_task_status(**kwargs):
dag_id = kwargs["dag_id"]
task_id = kwargs["task_id"]
start_date = kwargs["start_date"]
end_date = kwargs["end_date"]
session = settings.Session()
print("session: ", str(session))
# stmt = f"UPDATE task_instance SET state = 'success', try_number = 0 WHERE task_id = '{task_id}' AND dag_id = '{dag_id}' AND execution_date = '{start_date}';"
stmt = "Select * from task_instance;"
result = session.execute(stmt)
print(f"Attributes - {result._metadata.keys}")
for row in result:
print(f"Result from task instance - {row}")
PythonOperator(
task_id="backfill",
python_callable=set_task_status,
dag=dag,
op_kwargs={
"start_date": '{{ dag_run.conf["start_date"] if dag_run else "" }}',
"end_date": '{{ dag_run.conf["end_date"] if dag_run else "" }}',
"dag_id": '{{ dag_run.conf["dag_id"] if dag_run else "" }}',
"task_id": '{{ dag_run.conf["task_id"] if dag_run and "task_id" in dag_run.conf else None }}',
},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment