Skip to content

Instantly share code, notes, and snippets.

@nhammad
Last active July 27, 2023 11:53
Show Gist options
  • Save nhammad/564a1774100ebefb15246d14669b2c15 to your computer and use it in GitHub Desktop.
Save nhammad/564a1774100ebefb15246d14669b2c15 to your computer and use it in GitHub Desktop.
from airflow.contrib.sensors.python_sensor import PythonSensor
....
with DAG('test_airflow_dag', description='testing_python_sensor',
schedule_interval='30 5 * * *',
start_date=datetime(year=2020, month=01, day=12),
catchup=False,
default_args=args) as dag:
.....
def run_qa_check(**kwargs):
query = """SELECT
CASE
WHEN DATE_DIFF('hour', your_date_column, CURRENT_TIMESTAMP) = 24 THEN 'TRUE'
ELSE 'FALSE'
END AS is_24_hours
FROM
my_table;
"""
logging.info("Establishing DB connection")
con = snow.get_ctx(wh='TEST_DWH')
cur = con.cursor()
logging.info("Running query")
try:
cur.execute(query)
results = cur.fetchall()
logging.info("Query ran successfully")
if results[0][0] == "True":
logging.info("QA CHECK PASSED")
return True
else:
logging.info("QA CHECK NOT PASSED")
return False
except Exception as e:
logging.info(f"Error while running query on the database {e}")
con.close()
return 1
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
A = PythonOperator(
task_id='A',
python_callable=function_a
)
B = PythonOperator(
task_id='B',
python_callable=function_b
)
wait_for_condition = PythonSensor(
task_id='wait_for_qa_check_completion',
python_callable=run_qa_check,
mode='reschedule',
poke_interval=600
)
start >> A >> wait_for_condition >> B >> end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment