Created
August 24, 2022 08:35
-
-
Save PennyQ/90ad645219a0d4aeee89f7a23e1b2a6b to your computer and use it in GitHub Desktop.
python code for airflow callback on slack
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from airflow.hooks.base import BaseHook | |
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook | |
SLACK_CONN_ID = "pco_slack_alerts" | |
logger = logging.getLogger("airflow.task") | |
def send_fail_slack_alert(context): | |
""" | |
It uses slack_conn_id to fetch the password and send a notification to slack | |
""" | |
task_id = context.get("task_instance").task_id | |
logger.info(f"Task {task_id} just failed, we will send an alert to slack") | |
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password | |
slack_msg = """:rotating_light: @here A DAG just failed in airflow | |
*Task*: {task} | |
* Dag*: {dag} | |
* Execution Time*: {exec_date} | |
* Log Url*: {log_url} | |
""".format( | |
task=task_id, | |
dag=context.get("task_instance").dag_id, | |
exec_date=context.get("execution_date"), | |
log_url=context.get("task_instance").log_url, | |
) | |
hook = SlackWebhookHook( | |
http_conn_id=SLACK_CONN_ID, | |
webhook_token=slack_webhook_token, | |
message=slack_msg, | |
icon_emoji=":rotating_light:", | |
link_names=True, | |
) | |
return hook.execute() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment