Last active
November 10, 2023 18:35
-
-
Save Priyanku-AI/9a436bcceb15ecb6d23f6c80d75db44b to your computer and use it in GitHub Desktop.
Building a Monitoring and Alerting System using Airflow's Notification and SLA features from scratch.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pip install apache-airflow | |
airflow initdb | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
from datetime import datetime, timedelta | |
# Define default_args dictionary to pass to the DAG | |
default_args = { | |
'owner': 'me', | |
'start_date': datetime(2022, 1, 1), | |
'depends_on_past': False, | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5) | |
} | |
# Instantiate a DAG | |
dag = DAG( | |
'machine_learning_pipeline', | |
default_args=default_args, | |
schedule_interval=timedelta(days=1) # Run the pipeline every day | |
) | |
# Define a task for data preprocessing | |
def data_preprocessing(): | |
# Your code for data preprocessing goes here | |
pass | |
data_preprocessing_task = PythonOperator( | |
task_id='data_preprocessing', | |
python_callable=data_preprocessing, | |
dag=dag | |
) | |
# Define a task for training | |
def training(): | |
# Your code for training goes here | |
pass | |
training_task = PythonOperator( | |
task_id='training', | |
python_callable=training, | |
dag=dag | |
) | |
# Define a task for deployment | |
def deployment(): | |
# Your code for deployment goes here | |
pass | |
deployment_task = PythonOperator( | |
task_id='deployment', | |
python_callable=deployment, | |
dag=dag | |
) | |
# Set the order of the tasks | |
training_task.set_upstream(data_preprocessing_task) | |
deployment_task.set_upstream(training_task) | |
airflow run machine_learning_pipeline data_preprocessing | |
airflow webserver | |
from airflow.operators.email_operator import EmailOperator | |
task_fail_email = EmailOperator( | |
task_id='send_failure_email', | |
to='your_email@example.com', | |
subject='Airflow task failed', | |
html_content='Task {{ task_instance.task_id }} failed', | |
trigger_rule='one_failed', | |
dag=dag) | |
task >> task_fail_email | |
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator | |
task_fail_slack = SlackWebhookOperator( | |
task_id='send_failure_slack', | |
http_conn_id='slack', | |
webhook_token=SLACK_WEBHOOK_TOKEN, | |
message='Task {{ task_instance.task_id }} failed', | |
trigger_rule='one_failed', | |
dag=dag) | |
task >> task_fail_slack | |
my_task = BashOperator( | |
task_id='my_task', | |
bash_command='echo "Hello World"', | |
sla=timedelta(hours=2), | |
dag=dag | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment