Skip to content

Instantly share code, notes, and snippets.

@ddelange
Last active November 16, 2023 16:57
Show Gist options
  • Save ddelange/6e33f8f0df3a97d4a371d055aa2d58ac to your computer and use it in GitHub Desktop.
Save ddelange/6e33f8f0df3a97d4a371d055aa2d58ac to your computer and use it in GitHub Desktop.
Airflow Slack notifications

Airflow Slack notifications

Installation

Make sure slackclient v1.3.1 is installed (for apache-airflow 1.10).

pip install -U "apache-airflow[slack,...]"

General usage

Generate a slack legacy token for your workspace and create a SLACK_LEGACY_TOKEN environmental variable.

import os
from airflow.operators.slack_operator import SlackAPIPostOperator

SLACK_LEGACY_TOKEN = os.environ['SLACK_LEGACY_TOKEN']

def send_slack_notification(
    message='',
    attachments=None,
    channel=None,
):
    """Send message to Slack.

    message: Text of the message to send. See below for an explanation of
    formatting. This field is usually required, unless you're providing only
    attachments instead. Provide no more than 40,000 characters or risk truncati

    attachments: [list of max 100] dict[s] slack message attachment[s]
    see https://api.slack.com/docs/message-attachments#attachment_structure

    channel:  a channel in your workspace starting with `#`
    """
    assert isinstance(message, str) and message or attachments
    if isinstance(attachments, dict):
        attachments = [attachments]
    channel = channel or '#airflow-notifications'
    notification_operator = SlackAPIPostOperator(
        task_id='slack_notification',
        username='airflow-notifications',
        icon_url='https://github.com/apache/airflow/raw/v1-10-stable/airflow/www/static/pin_100.png',
        token=SLACK_LEGACY_TOKEN,
        channel=channel,
        text=message,
        attachments=attachments,
    )
    notification_operator.execute()

Example implementation

Failure notification

Send a notification with selected context details in a message attachment when a task fails using default_args.

default_args = {
    ...
    'on_failure_callback': failed_task_slack_notification,
}

def failed_task_slack_notification(kwargs):
    """Send failed task notification with context provided by operator."""
    domain = extract_domain(
        kwargs['ti'].log_url,
        with_subdomain=True,
    )
    dag = kwargs['ti'].dag_id
    run_id = kwargs['run_id']
    task = kwargs['ti'].task_id
    exec_date = kwargs['execution_date']
    try_number = kwargs['ti'].try_number - 1
    max_tries = kwargs['ti'].max_tries + 1
    exception = kwargs['exception']
    log_url = kwargs['ti'].log_url
    # command = kwargs['ti'].command(),

    message = (
        f'`DAG`  {dag}'
        f'\n`Run Id`  {run_id}'
        f'\n`Task`  {task} _(try {try_number} of {max_tries})_'
        f'\n`Execution`  {exec_date}'
        f'\n```{exception}```'
        # f'`Command`  {command}\n'
    )

    attachments = {
        'mrkdwn_in': ['text', 'pretext'],
        'pretext': ':boom: *Failure*',
        'title': domain.split('.')[0],
        'title_link': f'https://{domain}',
        'text': message,
        'actions': [
            {
                'type': 'button',
                'name': 'view log',
                'text': 'View log :airflow:',
                'url': log_url,
                'style': 'primary',
            },
        ],
        'color': 'danger',  # 'good', 'warning', 'danger', or hex ('#439FE0')
        'fallback': 'details',  # Required plain-text summary of the attachment
    }

    send_slack_notification(attachments=attachments)
@StefanoGITA
Copy link

Hi @ddelange, where i can find the method extract_domain?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment