Skip to content

Instantly share code, notes, and snippets.

@plieningerweb
Last active July 3, 2017 11:21
Show Gist options
  • Save plieningerweb/3185f14f1437cfc9b5722e1f041da7ca to your computer and use it in GitHub Desktop.
Save plieningerweb/3185f14f1437cfc9b5722e1f041da7ca to your computer and use it in GitHub Desktop.
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
dag = DAG(
dag_id='example_runtaskonce',
schedule_interval='@once',
start_date=days_ago(1),
)
parent = DummyOperator(dag=dag, task_id='root')
folders = map(lambda x: 'folder-'+str(x), range(0,3))
for folder in folders:
#runonce = RunOnceOperator(dag=dag, task_id='runonce_{}'.format(folder))
task_id_read_folders_in = 'read_folders_in_{}'.format(folder)
task_id_dummy_skip = 'dummy_skip_{}'.format(folder)
runonce = RunOnceBranchOperator(
dag=dag,
task_id='runonce_{}'.format(folder),
run_once_task_id=task_id_read_folders_in,
skip_task_id=task_id_dummy_skip
)
runonce.set_upstream(parent)
dummy_skip = DummyOperator(dag=dag, task_id=task_id_dummy_skip)
dummy_skip.set_upstream(runonce)
if folder == 'folder-1':
# cmd will fail
cmd = 'cat /tmp/not-there'
else:
cmd = 'echo 1'
read_folders_in = BashOperator(
task_id=task_id_read_folders_in, bash_command=cmd, dag=dag)
read_folders_in.set_upstream(runonce)
join = DummyOperator(
task_id='join_{}'.format(folder),
trigger_rule='one_success',
dag=dag
)
join.set_upstream(dummy_skip)
join.set_upstream(read_folders_in)
parent = join
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment