Skip to content

Instantly share code, notes, and snippets.

@qi-qi
Last active March 3, 2020 12:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save qi-qi/4d82d4c133cb8595fa2f7f180c129a6e to your computer and use it in GitHub Desktop.
Save qi-qi/4d82d4c133cb8595fa2f7f180c129a6e to your computer and use it in GitHub Desktop.
from airflow import DAG
from datetime import datetime, time
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.sensors.sql_sensor import SqlSensor
default_args = {
"start_date": datetime(2020, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
dag = DAG("team1_sqlsensor_demo", default_args=default_args, schedule_interval='@daily', catchup=False)
# To start with, for SQLSensor, please make sure:
# 1. mode is 'reschedule'
# 2. poke_interval should be larger than 900 seconds (15 minutes), eg. poke_interval=900
# 3. timeout should be less than 18000 seconds (5 hours)
sqlsensor1 = SqlSensor(
task_id="sqlsensor1",
sql='select true where 100 < (select num from "DEMO"."TEST"."AIRFLOW_SQLSENSOR_TEST");',
conn_id="jdbc-snowflake-demo",
mode='reschedule',
poke_interval=10,
timeout=18000,
dag=dag
)
# To start with, for SQLSensor, please make sure:
# 1. mode is 'reschedule'
# 2. poke_interval should be larger than 900 seconds (15 minutes), eg. poke_interval=900
# 3. timeout should be less than 18000 seconds (5 hours)
sqlsensor2 = SqlSensor(
task_id="sqlsensor2",
sql='select true where 100 < (select num from "DEMO"."TEST"."AIRFLOW_SQLSENSOR_TEST");',
conn_id="jdbc-snowflake-demo",
mode='reschedule',
poke_interval=10,
timeout=18000,
dag=dag
)
# To start with, for time sensor - please make sure:
# 1. mode is 'reschedule'
# 2. poke_interval should be larger than 1800 seconds (30 minutes), eg. poke_interval=900
time_sensor = TimeSensor(
task_id='TimeSensor2030',
target_time=time(hour=20, minute=30),
mode='reschedule',
poke_interval=900,
dag=dag
)
done1 = DummyOperator(
task_id='done1',
dag=dag,
)
done2 = DummyOperator(
task_id='done2',
dag=dag,
)
########## DAG ##########
sqlsensor1 >> done1
time_sensor >> sqlsensor2 >> done2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment