Skip to content

Instantly share code, notes, and snippets.

@PennyQ
Created August 24, 2022 08:35
Show Gist options
  • Save PennyQ/90ad645219a0d4aeee89f7a23e1b2a6b to your computer and use it in GitHub Desktop.
Save PennyQ/90ad645219a0d4aeee89f7a23e1b2a6b to your computer and use it in GitHub Desktop.
python code for airflow callback on slack
import logging
from airflow.hooks.base import BaseHook
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
SLACK_CONN_ID = "pco_slack_alerts"
logger = logging.getLogger("airflow.task")
def send_fail_slack_alert(context):
"""
It uses slack_conn_id to fetch the password and send a notification to slack
"""
task_id = context.get("task_instance").task_id
logger.info(f"Task {task_id} just failed, we will send an alert to slack")
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """:rotating_light: @here A DAG just failed in airflow
*Task*: {task}
* Dag*: {dag}
* Execution Time*: {exec_date}
* Log Url*: {log_url}
""".format(
task=task_id,
dag=context.get("task_instance").dag_id,
exec_date=context.get("execution_date"),
log_url=context.get("task_instance").log_url,
)
hook = SlackWebhookHook(
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
icon_emoji=":rotating_light:",
link_names=True,
)
return hook.execute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment