Skip to content

Instantly share code, notes, and snippets.

@pwasiewi
Forked from semihsezer/nested_dag.py
Created February 3, 2022 01:06
Show Gist options
  • Save pwasiewi/1f56bf8a484696fa3027c1613029e7df to your computer and use it in GitHub Desktop.
Save pwasiewi/1f56bf8a484696fa3027c1613029e7df to your computer and use it in GitHub Desktop.
Creating Dynamic Nested Subdags in Airflow (subdags within subdags)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Airflow',
'start_date': datetime(2020, 3, 10)
}
# Python callable methods for python operator
def schedule_tasks_method(**kwargs):
return 'Scheduling many tasks...'
def process_task(**kwargs):
return 'Processing task...'
# Helper methods for subdag and subtask creation
def create_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
''''Returns a DAG which has the dag_id formatted as parent.child '''
return DAG(
dag_id='{}.{}'.format(parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
default_args=default_args,
max_active_runs=15
)
def create_tasks(somedag, task_ids):
'''Creates tasks for the given dag'''
for task_id in task_ids:
dummy_operator = PythonOperator(
task_id='{}'.format(task_id),
python_callable=process_task,
dag=somedag,
)
# ---------------------------------------------
# Top dag
dag_id = 'nested_subdags'
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval='0 6 * * *',
max_active_runs=15,
)
# Top dag initial task
schedule_tasks = PythonOperator(
task_id='schedule_tasks',
provide_context=True,
python_callable=schedule_tasks_method,
dag=dag,
)
# Recursively create and assign subdags to parent dags.
# Remember, subdags are still DAGs. SubDagOperator only bundles it as a task for the parent dag.
level1_list = ['AWS', 'AZURE']
level2_list = ['eu', 'us', 'ap', 'jp']
tasks = ['task_{}'.format(str(i)) for i in range(0, 10)]
level1_subdag_operators = []
for level1_item in level1_list:
level1_dag = create_sub_dag(dag_id, level1_item, datetime(2020, 3, 10), '0 6 * * *')
level1_subdag_operator = SubDagOperator(
subdag=level1_dag,
task_id=level1_item,
dag=dag,
)
level1_subdag_operators.append(level1_subdag_operator)
for level2_item in level2_list:
level1_dag_id = '{}.{}'.format(dag_id, level1_item)
level2_dag = create_sub_dag(level1_dag_id, level2_item, datetime(2020, 3, 10), '0 6 * * *')
level2_subdag_operator = SubDagOperator(
subdag=level2_dag,
task_id=level2_item,
dag=level1_dag,
)
create_tasks(level2_dag, tasks)
# Set dag objects to null so that they don't appear on Dags UI.
# Won't be needed if you move it to another file or scope
level1_dag = None
level2_dag = None
# This is optional
schedule_tasks >> level1_subdag_operators
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment