Skip to content

Instantly share code, notes, and snippets.

@Priyanku-AI
Last active November 10, 2023 18:35
Show Gist options
  • Save Priyanku-AI/9a436bcceb15ecb6d23f6c80d75db44b to your computer and use it in GitHub Desktop.
Save Priyanku-AI/9a436bcceb15ecb6d23f6c80d75db44b to your computer and use it in GitHub Desktop.
Building a Monitoring and Alerting System using Airflow's Notification and SLA features from scratch.
pip install apache-airflow
airflow initdb
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# Define default_args dictionary to pass to the DAG
default_args = {
'owner': 'me',
'start_date': datetime(2022, 1, 1),
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# Instantiate a DAG
dag = DAG(
'machine_learning_pipeline',
default_args=default_args,
schedule_interval=timedelta(days=1) # Run the pipeline every day
)
# Define a task for data preprocessing
def data_preprocessing():
# Your code for data preprocessing goes here
pass
data_preprocessing_task = PythonOperator(
task_id='data_preprocessing',
python_callable=data_preprocessing,
dag=dag
)
# Define a task for training
def training():
# Your code for training goes here
pass
training_task = PythonOperator(
task_id='training',
python_callable=training,
dag=dag
)
# Define a task for deployment
def deployment():
# Your code for deployment goes here
pass
deployment_task = PythonOperator(
task_id='deployment',
python_callable=deployment,
dag=dag
)
# Set the order of the tasks
training_task.set_upstream(data_preprocessing_task)
deployment_task.set_upstream(training_task)
airflow run machine_learning_pipeline data_preprocessing
airflow webserver
from airflow.operators.email_operator import EmailOperator
task_fail_email = EmailOperator(
task_id='send_failure_email',
to='your_email@example.com',
subject='Airflow task failed',
html_content='Task {{ task_instance.task_id }} failed',
trigger_rule='one_failed',
dag=dag)
task >> task_fail_email
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
task_fail_slack = SlackWebhookOperator(
task_id='send_failure_slack',
http_conn_id='slack',
webhook_token=SLACK_WEBHOOK_TOKEN,
message='Task {{ task_instance.task_id }} failed',
trigger_rule='one_failed',
dag=dag)
task >> task_fail_slack
my_task = BashOperator(
task_id='my_task',
bash_command='echo "Hello World"',
sla=timedelta(hours=2),
dag=dag
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment