Skip to content

Instantly share code, notes, and snippets.

@slotrans
Last active June 6, 2017 00:46
Show Gist options
  • Save slotrans/b3e475c2b9789c4efc41876567902425 to your computer and use it in GitHub Desktop.
Save slotrans/b3e475c2b9789c4efc41876567902425 to your computer and use it in GitHub Desktop.
simple-ish Airflow DAG for which some tasks never execute
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'noah.yetter',
'depends_on_past': False,
'start_date': datetime(2017, 5, 23),
'email': ['noah@craftsy.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
# 'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
'concurrency': 4,
}
dag = DAG(dag_id='tasks_never_run', default_args=default_args, schedule_interval=timedelta(days=1))
words = [
'determined','blonde','cinder','arrows','foggy',
'lights','advancement','barnacle','bronze','island',
'frequency','beyond','fuse','chief','dozen',
'ambition','axe','fuzz','fiasco','candy'
]
for w in words:
t_root = BashOperator(
dag=dag,
task_id='t_root__{}'.format(w),
bash_command='sleep 10 && mkdir -p /tmp/tasks_never_run/{{ ds }} && echo "{{ params.word }}" > /tmp/tasks_never_run/{{ ds }}/root__{{ params.word }}',
params=dict(word=w)
)
t_leaf_one = BashOperator(
dag=dag,
task_id='t_leaf_one__{}'.format(w),
bash_command='sleep 1 && mkdir -p /tmp/tasks_never_run/{{ ds }} && echo "{{ params.word }} one" > /tmp/tasks_never_run/{{ ds }}/leaf_one__{{ params.word }}',
params=dict(word=w)
)
t_leaf_one.set_upstream(t_root)
t_leaf_two = BashOperator(
dag=dag,
task_id='t_leaf_two__{}'.format(w),
bash_command='sleep 2 && mkdir -p /tmp/tasks_never_run/{{ ds }} && echo "{{ params.word }} two" > /tmp/tasks_never_run/{{ ds }}/leaf_two__{{ params.word }}',
params=dict(word=w)
)
t_leaf_two.set_upstream(t_root)
t_leaf_three = BashOperator(
dag=dag,
task_id='t_leaf_three__{}'.format(w),
bash_command='sleep 3 && mkdir -p /tmp/tasks_never_run/{{ ds }} && echo "{{ params.word }} three" > /tmp/tasks_never_run/{{ ds }}/leaf_three__{{ params.word }}',
params=dict(word=w)
)
t_leaf_three.set_upstream(t_root)
t_leaf_four = BashOperator(
dag=dag,
task_id='t_leaf_four__{}'.format(w),
bash_command='sleep 4 && mkdir -p /tmp/tasks_never_run/{{ ds }} && echo "{{ params.word }} four" > /tmp/tasks_never_run/{{ ds }}/leaf_four__{{ params.word }}',
params=dict(word=w)
)
t_leaf_four.set_upstream(t_root)
t_leaf_five = BashOperator(
dag=dag,
task_id='t_leaf_five__{}'.format(w),
bash_command='sleep 5 && mkdir -p /tmp/tasks_never_run/{{ ds }} && echo "{{ params.word }} five" > /tmp/tasks_never_run/{{ ds }}/leaf_five__{{ params.word }}',
params=dict(word=w)
)
t_leaf_five.set_upstream(t_root)
if __name__ == '__main__':
dag.cli()
### CONFIG NOTES
# running against a local Postgres DB
# executor = LocalExecutor
# sql_alchemy_pool_size = 10
# job_heartbeat_sec = 30
# scheduler_hearbeat_sec = 30
# catchup_by_default = False
# ...all other config values unchanged from default
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment