Skip to content

Instantly share code, notes, and snippets.

@cmourouvin
Last active August 8, 2016 12:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cmourouvin/e4a66458a11c7f28dfb7d75c8d54d0de to your computer and use it in GitHub Desktop.
Save cmourouvin/e4a66458a11c7f28dfb7d75c8d54d0de to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.operators import BashOperator
from airflow.operators.sensors import TimeSensor
from datetime import datetime, timedelta, time
default_args = {
'owner': 'cedricm',
'depends_on_past': True,
'start_date': datetime(2016, 7, 30),
'email': ['test@test.fr'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
# DAG : Airflow 1.7.1.3
# Issue : when you want to run a daily dag with a specific schceduling time
# you will have an issue with the first day (start_date) wich will run 2 times at 00:00 and @ the specific hour
# Workaround : create a daily scheduled dag "@daily" and then use TimeSensor in upstream of the task you wanna run at a specific hour
dag = DAG('daily_firstday_twice_workaround', default_args=default_args, max_active_runs=1, concurrency=1, schedule_interval="@daily")
taskWaitHour = TimeSensor(task_id='wait_specific_hour',
target_time=time(9,20,0,0),
dag=dag)
task1 = BashOperator(
task_id='daily_firstday_twice_workaround_task1',
bash_command='echo daily_firstday_twice_workaround_task1 ! Hello World',
dag=dag,
depends_on_past=True)
task2 = BashOperator(
task_id='daily_firstday_twice_workaround_task2',
bash_command='echo daily_firstday_twice_workaround_task2 ! Good Morning World !!!',
dag=dag,
depends_on_past=True)
taskWaitHour >> task1 >> task2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment