Last active
August 18, 2024 04:11
-
-
Save bartosz25/4e35748bb30a6d947b40a73b7815f013 to your computer and use it in GitHub Desktop.
Apache Airflow external trigger example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload1\"}"}' | |
curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload2\"}"}' | |
curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload3\"}"}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '2.1' | |
services: | |
webserver: | |
image: puckel/docker-airflow:1.10.2 | |
restart: always | |
environment: | |
# Loads DAG examples | |
- LOAD_EX=y | |
networks: | |
airflow-external-trigger-network: | |
ipv4_address: 111.18.0.20 | |
volumes: | |
- ./dags:/usr/local/airflow/dags | |
- ./requirements.txt:/requirements.txt | |
- ./scripts:/tmp | |
ports: | |
- "8081:8080" | |
command: webserver | |
healthcheck: | |
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"] | |
interval: 30s | |
timeout: 30s | |
retries: 3 | |
rabbitmq: | |
image: rabbitmq:3.7-management | |
restart: always | |
networks: | |
airflow-external-trigger-network: | |
ipv4_address: 111.18.0.21 | |
hostname: rabbitmqdocker | |
ports: | |
- "15672:15672" | |
networks: | |
airflow-external-trigger-network: | |
driver: bridge | |
ipam: | |
driver: default | |
config: | |
- subnet: 111.18.0.0/16 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import pika | |
from airflow import utils | |
from airflow.models import DAG | |
from airflow.operators.dagrun_operator import TriggerDagRunOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators.python_operator import BranchPythonOperator | |
dag = DAG( | |
dag_id='external_trigger', | |
default_args={ | |
"owner": "airflow", | |
'start_date': utils.dates.days_ago(1), | |
}, | |
schedule_interval='*/1 * * * *', | |
) | |
def consume_message(**kwargs): | |
connection = pika.BlockingConnection(pika.ConnectionParameters('111.18.0.21')) | |
channel = connection.channel() | |
channel.queue_declare(queue='external_airflow_triggers', durable=True) | |
method_frame, header_frame, body = channel.basic_get(queue='external_airflow_triggers') | |
if body: | |
json_params = json.loads(body) | |
kwargs['ti'].xcom_push(key='job_params', value=json.dumps(json_params['params'])) | |
channel.basic_ack(delivery_tag=method_frame.delivery_tag) | |
connection.close() | |
print("Got message ? {}".format(body)) | |
return json_params['task'] | |
else: | |
return 'task_trash' | |
router = BranchPythonOperator( | |
task_id='router', | |
python_callable=consume_message, | |
dag=dag, | |
provide_context=True, | |
depends_on_past=True | |
) | |
def trigger_dag_with_context(context, dag_run_obj): | |
ti = context['task_instance'] | |
job_params = ti.xcom_pull(key='job_params', task_ids='router') | |
dag_run_obj.payload = {'task_payload': job_params} | |
return dag_run_obj | |
task_a = trigger = TriggerDagRunOperator( | |
task_id='hello_world_a', | |
trigger_dag_id="hello_world_a", | |
python_callable=trigger_dag_with_context, | |
params={'condition_param': True, 'task_payload': '{}'}, | |
dag=dag, | |
provide_context=True, | |
) | |
task_b = TriggerDagRunOperator( | |
task_id='hello_world_b', | |
trigger_dag_id="hello_world_b", | |
python_callable=trigger_dag_with_context, | |
params={'condition_param': True, 'task_payload': '{}'}, | |
dag=dag, | |
provide_context=True, | |
) | |
task_c = TriggerDagRunOperator( | |
task_id='hello_world_c', | |
trigger_dag_id="hello_world_c", | |
python_callable=trigger_dag_with_context, | |
params={'task_payload': '{}'}, | |
dag=dag, | |
provide_context=True, | |
) | |
task_trash = DummyOperator( | |
task_id='task_trash', | |
dag=dag | |
) | |
router >> task_a | |
router >> task_b | |
router >> task_c | |
router >> task_trash |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airflow | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
dag = DAG( | |
dag_id='hello_world_a', | |
default_args={ | |
"owner": "airflow", | |
'start_date': airflow.utils.dates.days_ago(1), | |
}, | |
schedule_interval=None | |
) | |
def print_hello(**context): | |
task_params = context['dag_run'].conf['task_payload'] | |
print('Hello world a with {}'.format(task_params)) | |
PythonOperator( | |
task_id='hello_world_printer', | |
python_callable=print_hello, | |
provide_context=True, | |
dag=dag) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airflow | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
dag = DAG( | |
dag_id='hello_world_b', | |
default_args={ | |
"owner": "airflow", | |
'start_date': airflow.utils.dates.days_ago(2), | |
}, | |
schedule_interval=None | |
) | |
def print_hello(**context): | |
task_params = context['dag_run'].conf['task_payload'] | |
print('Hello world b with {}'.format(task_params)) | |
PythonOperator( | |
task_id='hello_world_printer', | |
python_callable=print_hello, | |
provide_context=True, | |
dag=dag) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airflow | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
dag = DAG( | |
dag_id='hello_world_c', | |
default_args={ | |
"owner": "airflow", | |
'start_date': airflow.utils.dates.days_ago(2), | |
}, | |
schedule_interval=None | |
) | |
def print_hello(**context): | |
task_params = context['dag_run'].conf['task_payload'] | |
print('Hello world c with {}'.format(task_params)) | |
PythonOperator( | |
task_id='hello_world_printer', | |
python_callable=print_hello, | |
provide_context=True, | |
dag=dag) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from datetime import datetime | |
from random import randint, choice | |
from time import sleep | |
import pika | |
connection = pika.BlockingConnection(pika.ConnectionParameters('111.18.0.21')) | |
channel = connection.channel() | |
channel.queue_declare(queue='external_airflow_triggers', durable=True) | |
tasks = ['hello_world_a', 'hello_world_b', 'hello_world_c'] | |
while True: | |
print('Producing messages at {}'.format(datetime.utcnow())) | |
task_to_trigger = choice(tasks) | |
event_time = str(datetime.utcnow()) | |
message = json.dumps( | |
{'task': task_to_trigger, 'params': {'event_time': event_time, 'value': randint(0, 10000)}} | |
) | |
channel.basic_publish(exchange='', routing_key='external_airflow_triggers', | |
body=message) | |
print(" [x] Sent {}".format(message)) | |
sleep(2) | |
connection.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# for messages producer & local dev | |
pika | |
apache-airflow |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# only for Docker image | |
pika |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment