DAG dynamically generating tasks
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018,10,1),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 10,
'retry_delay': timedelta(minutes=0.1)
dag = DAG('dynamic_task_number_test', default_args=default_args,schedule_interval=None)
import random
t1 = BashOperator(
dag = dag
number_tasks = random.choice([1,2,3,4,5,6])
final = BashOperator(
dag = dag
for i in range(number_tasks):
task = BashOperator(
task_id= f'ls_{i}',
t1 >> task
final << task
# This seems to fail because the DAG gets changed whilst it is running. Airflow gets confused about whether upstream dependencies have been met
# because some get deleted and some get created during the run
