Skip to content

Instantly share code, notes, and snippets.

@4sushi
Created July 20, 2021 13:55
Show Gist options
  • Save 4sushi/d2e36598e94f1dc0e4f812a1a5130bb8 to your computer and use it in GitHub Desktop.
Save 4sushi/d2e36598e94f1dc0e4f812a1a5130bb8 to your computer and use it in GitHub Desktop.
Airflow use multiple schedule interval for one DAG. Condition based on date execution. Ex: run daily + monthly
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from croniter import croniter
import dateutil.parser
default_args = {
"start_date": days_ago(1),
"owner": "XXX",
"email": ["XXX"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG("test", default_args=default_args, schedule_interval="40 1 * * *")
def operator_monthly(**context):
schedul_interval = "* * 1 * *"
date_execution_iso = context['ts'] # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html#
dt = dateutil.parser.isoparse(date_execution_iso)
if croniter.match(schedul_interval, dt):
return run_monthly.task_id
else:
return run_daily.task_id
check_run_monthly = BranchPythonOperator(
task_id='check_run_monthly',
python_callable=operator_monthly,
provide_context=True,
dag=dag,
retries=0,
)
run_monthly = DummyOperator(task_id='run_monthly', dag=dag)
run_daily = DummyOperator(task_id='run_daily', dag=dag, trigger_rule=TriggerRule.NONE_FAILED)
check_run_monthly >> run_monthly >> run_daily
@4sushi
Copy link
Author

4sushi commented Jul 20, 2021

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment