Created
February 17, 2021 14:47
-
-
Save yanhaeffner/df5fce622426d70707e635a7914489f8 to your computer and use it in GitHub Desktop.
Basic DAG file to test parallel execution on Airflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# First, let's import all the basic Airflow modules that we will be using on this DAG file | |
from airflow.utils.dates import days_ago | |
from airflow.models import DAG | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators.python_operator import PythonOperator | |
import time | |
# Then, define your "default_args" dict to store a few of our DAG arguments, notice that the use of this dict is a good practice for DAG writing since we won't be needing to set things manually later on | |
default_args = { | |
"owner": "admin", | |
"start_date": days_ago(1) | |
} | |
# Create the Python function that will be executed by the parallel tasks (which is basically a sleep of 2 seconds) | |
def sleep_time(): | |
time.sleep(2) | |
with DAG('MyDAG', schedule_interval='@daily', default_args=default_args, catchup=False) as dag: | |
begin = DummyOperator( | |
task_id='begin' | |
) | |
first_task = DummyOperator( | |
task_id='first_task' | |
) | |
first_parallel_task = PythonOperator( | |
task_id='first_parallel_task', | |
python_callable=sleep_time | |
) | |
second_parallel_task = PythonOperator( | |
task_id='second_parallel_task', | |
python_callable=sleep_time | |
) | |
third_parallel_task = PythonOperator( | |
task_id='third_parallel_task', | |
python_callable=sleep_time | |
) | |
last_task = DummyOperator( | |
task_id='last_task' | |
) | |
end = DummyOperator( | |
task_id='end' | |
) | |
# Now, we set the dependencies downstream and, using a list of tasks, our parallel tasks dependency before last_task | |
begin >> first_task >> [first_parallel_task, second_parallel_task, third_parallel_task] >> last_task >> end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment