Created
July 11, 2021 20:13
-
-
Save juangesino/544aca8124459c2afa72a61e4fbe8d88 to your computer and use it in GitHub Desktop.
How we use Airflow for SQL alerts and Slack notifications
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function that takes the config and creates a DAG | |
def create_dag_alert(config): | |
# Part 1: Define the DAG | |
... | |
# Part 2: Task function | |
... | |
# Part 3: Setup the DAG and task | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Give the alert a name | |
# This will be sent in the notification | |
name: Example Alert | |
# This will be the name of the generated DAG | |
# Should be a short and machine-friendly name | |
# Use underscores (`_`) instead of spaces and use lowercase | |
# We will add an `alert_` prefix and a `_dag` suffix, so no need to include that | |
nickname: example | |
# Add a description to describe the alert | |
# This will be sent in the notification | |
description: > | |
An example alert to show how to configure alerts in Airflow. | |
This message will be part of the notification sent to people. | |
# Easy way to disable an alert | |
# If true, a notification will be sent. | |
# If false, no notifications will be sent. | |
enabled: false | |
# Determines how often the alert should be checked (usually `@daily`) | |
interval: "@daily" | |
# The owner of the DAG (usually `airflow`) | |
owner: airflow | |
# Defines the condition to check against the returned value | |
# The available criteria are: | |
# greater than: Notifies if the returned value is greater than the condition value | |
# equal to: Notifies if the returned value is exactly equal to the condition value | |
# less than: Notifies if the returned value is less than the condition value | |
criteria: greater than | |
# The value to compare against | |
value: 10 | |
# The query to execute to be compared with the condition | |
# This query should return one and only one column and one and only one row | |
# Note: If you are using Snowflake, Airflow will only have access to the RAW database | |
query: > | |
select count(*) | |
from raw.public.clients | |
# The method of notification | |
# slack: Sends a Slack message to the recipients (users or channels). | |
# email: Not implemented. Sends an email to the recipients. | |
notifier: slack | |
# If notifier is email, this list has to be of emails addresses | |
# If notifier is slack, this list has to be of Slack usernames and/or channels | |
# Note: Use double quotes ("") for each recipient | |
recipients: | |
- "#data-alerts" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
alert_files = glob.glob("dags/alerts/*.yaml") | |
for alert_path in alert_files: | |
# Read YAML file | |
with open(alert_path, "r") as stream: | |
config = yaml.safe_load(stream) | |
# Check if the alert is enabled | |
if config["enabled"]: | |
# Add to global scope | |
globals()[config["nickname"]] = create_dag_alert(config) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Define default arguments for DAG | |
default_args = { | |
"owner": config["owner"], | |
"start_date": datetime(2021, 2, 21), | |
"retries": 0, | |
"retry_delay": timedelta(minutes=5), | |
} | |
# Initialize the DAG | |
dag = DAG( | |
dag_id="alert_" + config["nickname"] + "_dag", | |
default_args=default_args, | |
schedule_interval=config["interval"], | |
catchup=False, | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _run_alert(**context): | |
# Get data using query | |
log.info("Get data using query") | |
query = config["query"] | |
log.debug(f"Query: {query}") | |
# Run query against Snowflake | |
result = sf.query(query) | |
result_value = result[0][0] | |
# Default action is to not notify | |
notify = False | |
# Compare return value to condition | |
criteria = config["criteria"].strip().lower().replace(" ", "_") | |
if criteria == "greater_than": | |
if result_value > config["value"]: | |
notify = True | |
elif criteria == "equal_to": | |
if result_value == config["value"]: | |
notify = True | |
elif criteria == "less_than": | |
if result_value < config["value"]: | |
notify = True | |
else: | |
log.error(f"Unknown condition criteria: {criteria}") | |
log.error("Original condition criteria:", config["criteria"]) | |
log.error( | |
"Check the value of the 'criteria' parameter in the configuration file of this alert" | |
) | |
raise RuntimeError(f"Unknown condition criteria: {criteria}") | |
# Notify only if condition was met | |
if notify: | |
# Check which notifier to use | |
notifier = config["notifier"].strip().lower().replace(" ", "_") | |
# Check Slack notifier | |
if notifier == "slack": | |
# Call the Slack notifier | |
notify_slack( | |
config["name"], | |
config["description"], | |
config["recipients"], | |
config["criteria"].strip().lower(), | |
config["value"], | |
result_value, | |
) | |
# Check the email notifier | |
elif notifier == "email": | |
# Call the email notifier | |
notify_email( | |
config["name"], | |
config["description"], | |
config["recipients"], | |
config["criteria"].strip().lower(), | |
config["value"], | |
result_value, | |
) | |
# Unknown notifier | |
else: | |
log.error(f"Unknown notifier: {notifier}") | |
log.error("Original notifier:", config["notifier"]) | |
log.error( | |
"Check the value of the 'notifier' parameter in the configuration file of this alert" | |
) | |
raise RuntimeError(f"Unknown notifier: {notifier}") | |
return "OK" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with dag: | |
t1 = PythonOperator( | |
task_id="run_alert", python_callable=_run_alert, provide_context=True | |
) | |
return dag |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Add to global scope | |
globals()[config["nickname"]] = create_dag_alert(config) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment