Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save juangesino/544aca8124459c2afa72a61e4fbe8d88 to your computer and use it in GitHub Desktop.
Save juangesino/544aca8124459c2afa72a61e4fbe8d88 to your computer and use it in GitHub Desktop.
How we use Airflow for SQL alerts and Slack notifications

How we use Airflow for SQL alerts and Slack notifications

# Function that takes the config and creates a DAG
def create_dag_alert(config):
# Part 1: Define the DAG
...
# Part 2: Task function
...
# Part 3: Setup the DAG and task
...
# Give the alert a name
# This will be sent in the notification
name: Example Alert
# This will be the name of the generated DAG
# Should be a short and machine-friendly name
# Use underscores (`_`) instead of spaces and use lowercase
# We will add an `alert_` prefix and a `_dag` suffix, so no need to include that
nickname: example
# Add a description to describe the alert
# This will be sent in the notification
description: >
An example alert to show how to configure alerts in Airflow.
This message will be part of the notification sent to people.
# Easy way to disable an alert
# If true, a notification will be sent.
# If false, no notifications will be sent.
enabled: false
# Determines how often the alert should be checked (usually `@daily`)
interval: "@daily"
# The owner of the DAG (usually `airflow`)
owner: airflow
# Defines the condition to check against the returned value
# The available criteria are:
# greater than: Notifies if the returned value is greater than the condition value
# equal to: Notifies if the returned value is exactly equal to the condition value
# less than: Notifies if the returned value is less than the condition value
criteria: greater than
# The value to compare against
value: 10
# The query to execute to be compared with the condition
# This query should return one and only one column and one and only one row
# Note: If you are using Snowflake, Airflow will only have access to the RAW database
query: >
select count(*)
from raw.public.clients
# The method of notification
# slack: Sends a Slack message to the recipients (users or channels).
# email: Not implemented. Sends an email to the recipients.
notifier: slack
# If notifier is email, this list has to be of emails addresses
# If notifier is slack, this list has to be of Slack usernames and/or channels
# Note: Use double quotes ("") for each recipient
recipients:
- "#data-alerts"
alert_files = glob.glob("dags/alerts/*.yaml")
for alert_path in alert_files:
# Read YAML file
with open(alert_path, "r") as stream:
config = yaml.safe_load(stream)
# Check if the alert is enabled
if config["enabled"]:
# Add to global scope
globals()[config["nickname"]] = create_dag_alert(config)
# Define default arguments for DAG
default_args = {
"owner": config["owner"],
"start_date": datetime(2021, 2, 21),
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
# Initialize the DAG
dag = DAG(
dag_id="alert_" + config["nickname"] + "_dag",
default_args=default_args,
schedule_interval=config["interval"],
catchup=False,
)
def _run_alert(**context):
# Get data using query
log.info("Get data using query")
query = config["query"]
log.debug(f"Query: {query}")
# Run query against Snowflake
result = sf.query(query)
result_value = result[0][0]
# Default action is to not notify
notify = False
# Compare return value to condition
criteria = config["criteria"].strip().lower().replace(" ", "_")
if criteria == "greater_than":
if result_value > config["value"]:
notify = True
elif criteria == "equal_to":
if result_value == config["value"]:
notify = True
elif criteria == "less_than":
if result_value < config["value"]:
notify = True
else:
log.error(f"Unknown condition criteria: {criteria}")
log.error("Original condition criteria:", config["criteria"])
log.error(
"Check the value of the 'criteria' parameter in the configuration file of this alert"
)
raise RuntimeError(f"Unknown condition criteria: {criteria}")
# Notify only if condition was met
if notify:
# Check which notifier to use
notifier = config["notifier"].strip().lower().replace(" ", "_")
# Check Slack notifier
if notifier == "slack":
# Call the Slack notifier
notify_slack(
config["name"],
config["description"],
config["recipients"],
config["criteria"].strip().lower(),
config["value"],
result_value,
)
# Check the email notifier
elif notifier == "email":
# Call the email notifier
notify_email(
config["name"],
config["description"],
config["recipients"],
config["criteria"].strip().lower(),
config["value"],
result_value,
)
# Unknown notifier
else:
log.error(f"Unknown notifier: {notifier}")
log.error("Original notifier:", config["notifier"])
log.error(
"Check the value of the 'notifier' parameter in the configuration file of this alert"
)
raise RuntimeError(f"Unknown notifier: {notifier}")
return "OK"
with dag:
t1 = PythonOperator(
task_id="run_alert", python_callable=_run_alert, provide_context=True
)
return dag
# Add to global scope
globals()[config["nickname"]] = create_dag_alert(config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment