Skip to content

Instantly share code, notes, and snippets.

@akoeltringer
Last active August 9, 2018 09:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akoeltringer/63fcf0340ae219c112b2a5377e6d2715 to your computer and use it in GitHub Desktop.
Save akoeltringer/63fcf0340ae219c112b2a5377e6d2715 to your computer and use it in GitHub Desktop.
Reproduce Airflow Subdag Concurrency Issue

Reproduce Airflow Subdag Concurrency Issue

Install Airflow

mkdir AirflowTemp
cd AirflowTemp
python -m venv venv
source venv/bin/activate
pip install "SQLAlchemy==1.1.15" "apache-airflow[crypto,password,mysql]==1.9.0"

Start the database

docker run \
    --detach \
    --name airflow-mariadb-temp \
    --publish 3306:3306 \
    --env "MYSQL_RANDOM_ROOT_PASSWORD=yes" \
    --env "MYSQL_DATABASE=airflow" \
    --env "MYSQL_USER=airflow" \
    --env "MYSQL_PASSWORD=my-secret-db-password" \
    mariadb:latest

Airflow Configurations

executor = LocalExecutor
sql_alchemy_conn = mysql+mysqldb://airflow:my-secret-db-password@127.0.0.1/airflow
load_examples = False

Start Airflow

Add test_subdag_dag.py to the dags folder. Then:

export AIRFLOW_HOME=$(pwd)
airflow initdb
airflow webserver
airflow scheduler

Result

airflow_subdag_tasks_parallel.png

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import airflow
import airflow.operators.subdag_operator
import airflow.operators.bash_operator
DAG_ID = 'TestSubdagConcurrencyDag'
SCHEDULE_INTERVAL = datetime.timedelta(days=1)
default_dag_args = {
'start_date': datetime.datetime(2018, 1, 1),
}
def subdag_factory(parent_dag, child_dag, *args, **kwargs):
"""Create SubDag with 5 Tasks
"""
subdag = airflow.DAG(
dag_id='{}.{}'.format(parent_dag, child_dag),
default_args=default_dag_args,
schedule_interval=SCHEDULE_INTERVAL,
)
for task_nr in range(5):
airflow.operators.bash_operator.BashOperator(
task_id='bash_task_{}'.format(task_nr),
bash_command='echo {}; sleep 30'.format(task_nr),
dag=subdag,
)
return subdag
with airflow.DAG(
dag_id=DAG_ID,
default_args=default_dag_args,
schedule_interval=SCHEDULE_INTERVAL,
) as dag:
SUBDAG_TASK_ID = 'subdag_task'
subdag_task = airflow.operators.subdag_operator.SubDagOperator(
task_id=SUBDAG_TASK_ID,
subdag=subdag_factory(
DAG_ID,
SUBDAG_TASK_ID,
),
default_args=default_dag_args,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment