Skip to content

Instantly share code, notes, and snippets.

@waltherg
Forked from boxysean/slack.py
Created October 4, 2016 18:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save waltherg/565e461157097926d1fb8026820c2ec6 to your computer and use it in GitHub Desktop.
Save waltherg/565e461157097926d1fb8026820c2ec6 to your computer and use it in GitHub Desktop.
PythonSlackOperator -- how I've integrated notifications into my PythonOperators
# airflow/plugins/slack.py
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.plugins_manager import AirflowPlugin
from slackclient import SlackClient
from titan.utils import config
class PythonSlackOperator(PythonOperator):
def __init__(self, *args, **kwargs):
super(PythonSlackOperator, self).__init__(*args, **kwargs)
def _post_slack_message(self, text):
try:
sc = SlackClient(config.get('slack', 'token'))
sc.api_call('chat.postMessage',
channel=config.get('slack', 'default_channel'),
username='Airflow',
icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png',
text=text)
except:
logging.exception('Non-fatal error: could not post message to Slack (text: {text})'.format(**locals()))
def execute(self, context, *args, **kwargs):
task_instance = context.get('task_instance')
dag_run = context.get('dag_run')
dag = context.get('dag')
if dag_run is None:
dag_run_id = 'N/A'
dag_run_external_trigger = 'N/A'
else:
dag_run_id = dag_run.run_id
dag_run_external_trigger = ':zap:' if dag_run.external_trigger else ':clock3:'
base_url = config.get('airflow', 'base_url')
log_url = '{base_url}{task_instance.log_url}'.format(**locals())
self._post_slack_message(':chicken:{dag_run_external_trigger} Starting task (dag=*{task_instance.dag_id}* task=*{task_instance.task_id}* dagid=*{dag_run_id}*) <{log_url}|logs>'.format(**locals()))
result = super(PythonSlackOperator, self).execute(context, *args, **kwargs)
self._post_slack_message(':poultry_leg:{dag_run_external_trigger} Finished task (dag=*{task_instance.dag_id}* task=*{task_instance.task_id}* dagid=*{dag_run_id}*) <{log_url}|logs>'.format(**locals()))
class PythonSlackAPlugin(AirflowPlugin):
name = 'dp_python_slack'
operators = [PythonSlackOperator]
flask_blueprints = []
hooks = []
executors = []
admin_views = []
menu_links = []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment