Skip to content

Instantly share code, notes, and snippets.

@apeyroux
Last active October 7, 2022 07:07
Show Gist options
  • Save apeyroux/4c909a610405379c4733f9ae8ca2230f to your computer and use it in GitHub Desktop.
Save apeyroux/4c909a610405379c4733f9ae8ca2230f to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
import errno
from os import rename
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
#
# Operator qui test si un DAG n'est pas déjà en cours d'exécution avec la meme configuration
# curl -XPOST http://127.0.0.1:8081/api/v1/dags/rename_user/dagRuns -H 'Content-Type: application/json' -u airflow:airflow -d '{"conf":{"uid":"2"}}'
#
from airflow.operators.branch_operator import BaseBranchOperator
from airflow.models.dagrun import DagRun
from airflow.utils.state import State
from airflow.utils.session import provide_session
from airflow.exceptions import AirflowException
class IsRunWithSameConfigOperator(BaseBranchOperator):
def __init__(self, *, follow_task_ids_if_true, follow_task_ids_if_false, **kwargs):
super().__init__(**kwargs)
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.follow_branch = follow_task_ids_if_false
@provide_session
def choose_branch(self, context, session=None):
if self.follow_task_ids_if_true is None:
raise AirflowException(
"Expected 'follow_task_ids_if_true' paramter is missing."
)
if self.follow_task_ids_if_false is None:
raise AirflowException(
"Expected 'follow_task_ids_if_false' parameter is missing."
)
dag_id = context["dag"].dag_id
conf = context["dag_run"].conf
nb_run = session.query(DagRun).filter(
DagRun.state == State.RUNNING,
DagRun.dag_id == dag_id,
DagRun.conf == conf,
).count()
self.log.info("nb_run: %s", nb_run)
if nb_run == 1:
return self.follow_task_ids_if_true
else:
return self.follow_task_ids_if_false
with DAG(
'rename_user',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'retries': 2,
'retry_delay': timedelta(seconds=5)
},
description='Rename a user. Only one DAG can run at a time',
start_date=datetime(2021, 1, 1),
schedule_interval=None,
max_active_runs=10,
tags=['example'],
) as dag:
rename_user_name = BashOperator(
task_id='rename_user_name',
bash_command='sleep 5',
retries=3,
)
change_user_mail = BashOperator(
task_id='change_user_mail',
bash_command='sleep 5',
retries=3,
)
error = BashOperator(
task_id='error',
bash_command='exit 1'
)
error_2 = BashOperator(
task_id='error_2',
bash_command='exit 0'
)
notify_talend = BashOperator(
task_id='notify_talend',
trigger_rule='none_failed_min_one_success',
bash_command='sleep 5',
)
t0 = IsRunWithSameConfigOperator(
task_id="the_way_is_open",
depends_on_past=False,
follow_task_ids_if_true=['rename_user_name'],
follow_task_ids_if_false=['error', 'error_2'],
)
t0 >> rename_user_name >> change_user_mail >> notify_talend
t0 >> error_2 >> notify_talend
t0 >> error >> notify_talend
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment