Skip to content

Instantly share code, notes, and snippets.

@goodbyegangster
Created June 27, 2022 02:16
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
A Sample ExternalTaskSensor DAG
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import (
ExternalTaskMarker,
ExternalTaskSensor,
)
JST = timezone(timedelta(hours=+9), 'JST')
with DAG(
dag_id='external_dag_parent',
description='A Sample ExternalTaskMarker DAG',
schedule_interval=timedelta(minutes=3),
start_date=datetime(2022, 6, 27, 11, 0, 0, tzinfo=JST),
catchup=False,
tags=['example'],
) as parent_dag:
biginning_empty = EmptyOperator(task_id='biginning_empty')
# ExternalTaskMarker
# https://github.com/apache/airflow/blob/main/airflow/sensors/external_task.py
parent_task = ExternalTaskMarker(
task_id="parent_task",
external_dag_id="external_dag_child",
external_task_id="child_task",
)
# setting up dependencies.
biginning_empty >> parent_task
with DAG(
dag_id="external_dag_child",
description='A Sample ExternalTaskSensor DAG',
schedule_interval=timedelta(minutes=3),
start_date=datetime(2022, 6, 27, 11, 0, 0, tzinfo=JST),
catchup=False,
tags=['example'],
) as child_dag:
# ExternalTaskSensor
# https://github.com/apache/airflow/blob/main/airflow/sensors/external_task.py
child_task = ExternalTaskSensor(
task_id='child_task',
external_dag_id=parent_dag.dag_id,
external_task_id=parent_task.task_id,
timeout=600,
allowed_states=['success'],
failed_states=['failed', 'skipped'],
mode='poke',
)
finishing_empty = EmptyOperator(task_id='finishing_empty')
# setting up dependencies.
child_task >> finishing_empty
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment