Skip to content

Instantly share code, notes, and snippets.

@xnuinside
Last active January 18, 2021 12:12
Show Gist options
  • Save xnuinside/3bbaaf9a406a064b0054f32c105bb5d0 to your computer and use it in GitHub Desktop.
Save xnuinside/3bbaaf9a406a064b0054f32c105bb5d0 to your computer and use it in GitHub Desktop.
Apache Airflow: Check Table Exist and get schema name with Python callable and PostgresHook
from datetime import datetime
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator
from airflow import DAG
with DAG(dag_id="postgres_check_table", start_date=datetime(2018, 10, 12)) as dag:
def check_table_exist(sql_to_get_schema, sql_to_check_table_exist,
table_name):
""" callable function to get schema name and after that check if table exist """
hook = PostgresHook()
# get schema name
query = hook.get_records(sql=sql_to_get_schema)
for result in query:
if 'airflow' in result:
schema = result[0]
print(schema)
break
# check table exist
query = hook.get_first(sql=sql_to_check_table_exist.format(schema, table_name))
print(query)
if query:
return True
else:
raise ValueError("table {} does not exist".format(table_name))
# will success
table_name_success = "dag"
get_pg_table = PythonOperator(task_id="check_table_success", python_callable=check_table_exist,
op_args=["SELECT * FROM pg_tables;",
"SELECT * FROM information_schema.tables "
"WHERE table_schema = '{}'"
"AND table_name = '{}';", table_name_success])
# will fails
table_name_fail = "rock"
get_rock_pg_table = PythonOperator(task_id="check_table_fail",
python_callable=check_table_exist,
op_args=["SELECT * FROM pg_tables;",
"SELECT * FROM information_schema.tables "
"WHERE table_schema = 'schema_name' "
"AND table_name = '{}';", table_name_fail])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment