Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Apache Airflow external trigger example
curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload1\"}"}'
curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload2\"}"}'
curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload3\"}"}'
version: '2.1'
services:
webserver:
image: puckel/docker-airflow:1.10.2
restart: always
environment:
# Loads DAG examples
- LOAD_EX=y
networks:
airflow-external-trigger-network:
ipv4_address: 111.18.0.20
volumes:
- ./dags:/usr/local/airflow/dags
- ./requirements.txt:/requirements.txt
- ./scripts:/tmp
ports:
- "8081:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
rabbitmq:
image: rabbitmq:3.7-management
restart: always
networks:
airflow-external-trigger-network:
ipv4_address: 111.18.0.21
hostname: rabbitmqdocker
ports:
- "15672:15672"
networks:
airflow-external-trigger-network:
driver: bridge
ipam:
driver: default
config:
- subnet: 111.18.0.0/16
import json
import pika
from airflow import utils
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
dag = DAG(
dag_id='external_trigger',
default_args={
"owner": "airflow",
'start_date': utils.dates.days_ago(1),
},
schedule_interval='*/1 * * * *',
)
def consume_message(**kwargs):
connection = pika.BlockingConnection(pika.ConnectionParameters('111.18.0.21'))
channel = connection.channel()
channel.queue_declare(queue='external_airflow_triggers', durable=True)
method_frame, header_frame, body = channel.basic_get(queue='external_airflow_triggers')
if body:
json_params = json.loads(body)
kwargs['ti'].xcom_push(key='job_params', value=json.dumps(json_params['params']))
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()
print("Got message ? {}".format(body))
return json_params['task']
else:
return 'task_trash'
router = BranchPythonOperator(
task_id='router',
python_callable=consume_message,
dag=dag,
provide_context=True,
depends_on_past=True
)
def trigger_dag_with_context(context, dag_run_obj):
ti = context['task_instance']
job_params = ti.xcom_pull(key='job_params', task_ids='router')
dag_run_obj.payload = {'task_payload': job_params}
return dag_run_obj
task_a = trigger = TriggerDagRunOperator(
task_id='hello_world_a',
trigger_dag_id="hello_world_a",
python_callable=trigger_dag_with_context,
params={'condition_param': True, 'task_payload': '{}'},
dag=dag,
provide_context=True,
)
task_b = TriggerDagRunOperator(
task_id='hello_world_b',
trigger_dag_id="hello_world_b",
python_callable=trigger_dag_with_context,
params={'condition_param': True, 'task_payload': '{}'},
dag=dag,
provide_context=True,
)
task_c = TriggerDagRunOperator(
task_id='hello_world_c',
trigger_dag_id="hello_world_c",
python_callable=trigger_dag_with_context,
params={'task_payload': '{}'},
dag=dag,
provide_context=True,
)
task_trash = DummyOperator(
task_id='task_trash',
dag=dag
)
router >> task_a
router >> task_b
router >> task_c
router >> task_trash
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='hello_world_a',
default_args={
"owner": "airflow",
'start_date': airflow.utils.dates.days_ago(1),
},
schedule_interval=None
)
def print_hello(**context):
task_params = context['dag_run'].conf['task_payload']
print('Hello world a with {}'.format(task_params))
PythonOperator(
task_id='hello_world_printer',
python_callable=print_hello,
provide_context=True,
dag=dag)
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='hello_world_b',
default_args={
"owner": "airflow",
'start_date': airflow.utils.dates.days_ago(2),
},
schedule_interval=None
)
def print_hello(**context):
task_params = context['dag_run'].conf['task_payload']
print('Hello world b with {}'.format(task_params))
PythonOperator(
task_id='hello_world_printer',
python_callable=print_hello,
provide_context=True,
dag=dag)
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='hello_world_c',
default_args={
"owner": "airflow",
'start_date': airflow.utils.dates.days_ago(2),
},
schedule_interval=None
)
def print_hello(**context):
task_params = context['dag_run'].conf['task_payload']
print('Hello world c with {}'.format(task_params))
PythonOperator(
task_id='hello_world_printer',
python_callable=print_hello,
provide_context=True,
dag=dag)
import json
from datetime import datetime
from random import randint, choice
from time import sleep
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('111.18.0.21'))
channel = connection.channel()
channel.queue_declare(queue='external_airflow_triggers', durable=True)
tasks = ['hello_world_a', 'hello_world_b', 'hello_world_c']
while True:
print('Producing messages at {}'.format(datetime.utcnow()))
task_to_trigger = choice(tasks)
event_time = str(datetime.utcnow())
message = json.dumps(
{'task': task_to_trigger, 'params': {'event_time': event_time, 'value': randint(0, 10000)}}
)
channel.basic_publish(exchange='', routing_key='external_airflow_triggers',
body=message)
print(" [x] Sent {}".format(message))
sleep(2)
connection.close()
# for messages producer & local dev
pika
apache-airflow
# only for Docker image
pika
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment