Skip to content

Instantly share code, notes, and snippets.

@RobinL
Created July 14, 2018 08:41
Show Gist options
  • Save RobinL/f79cc27f1620eb842306bcb709e802cd to your computer and use it in GitHub Desktop.
Save RobinL/f79cc27f1620eb842306bcb709e802cd to your computer and use it in GitHub Desktop.
DAG dynamically generating tasks
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018,10,1),
'email': ['robinlinacre@hotmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 10,
'retry_delay': timedelta(minutes=0.1)
}
dag = DAG('dynamic_task_number_test', default_args=default_args,schedule_interval=None)
import random
t1 = BashOperator(
task_id='print_date_1',
bash_command='date',
dag = dag
)
number_tasks = random.choice([1,2,3,4,5,6])
final = BashOperator(
task_id='final',
bash_command='date',
dag = dag
)
for i in range(number_tasks):
task = BashOperator(
task_id= f'ls_{i}',
bash_command='ls',
retries=3,
dag=dag)
t1 >> task
final << task
# This seems to fail because the DAG gets changed whilst it is running. Airflow gets confused about whether upstream dependencies have been met
# because some get deleted and some get created during the run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment