Skip to content

Instantly share code, notes, and snippets.

@victorouse
Last active September 26, 2023 03:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save victorouse/8f2ebe970949bd6c2af16b1ea35ff7ab to your computer and use it in GitHub Desktop.
Save victorouse/8f2ebe970949bd6c2af16b1ea35ff7ab to your computer and use it in GitHub Desktop.
Custom Slack operator for Airflow DAGs
# dags/operators/custom_slack_operator.py
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from utils.slack_message import SlackMessage, Attachment, Block
class CustomSlackOperator(SlackAPIPostOperator):
github_url = "https://github.com/your-org/your-repo"
airflow_webserver_url = "https://xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-dot-australia-southeast1.composer.googleusercontent.com"
status_color_map = {
"success": "#2EB67D",
"running": "#ECB22E",
"failed": "#E01E5A",
}
def __init__(
self,
channel="#data-ops",
status=None,
task_name=None,
subscribers=None,
*args,
**kwargs,
):
self.subscribers = subscribers
self.status = status
self.task_name = task_name
super().__init__(
*args, slack_conn_id="slack_bot_token", channel=channel, **kwargs
)
def execute(self, context):
status = self.status or context["ti"].state
color = self.status_color_map.get(status)
dag_name = context["dag"].dag_id
fallback = f"DAG {dag_name} has {status}"
log_url = f"{self.airflow_webserver_url}/dags/{dag_name}/graph"
self.text = ""
self.attachments = self.create_slack_message(
dag_name=dag_name,
task_name=self.task_name,
status=status,
color=color,
fallback=fallback,
subscribers=self.subscribers,
log_url=log_url,
)
super().execute()
@classmethod
def failure_notification(cls, context):
dag_run = context["dag_run"]
failed_tis = dag_run.get_task_instances(state="failed")
failed_task_ids = [ti.task_id for ti in failed_tis]
task_name = failed_task_ids[0] if len(failed_task_ids) > 0 else None
cls(
task_id="slack_notification_on_failure",
task_name=task_name,
status="failed",
subscribers=["@victor", "@data-crunchers"],
).execute(context)
@staticmethod
def create_slack_message(
*,
color,
fallback,
status,
dag_name,
task_name=None,
subscribers=None,
log_url=None,
):
message = SlackMessage()
attachment = Attachment(color=color, fallback=fallback)
block = Block()
block.add_field(
"Repository",
f"<{CustomSlackOperator.github_url}|your-org/your-repo>",
)
block.add_field("Event", "dag_run", code_format=True)
block.add_field("DAG", dag_name, code_format=True)
block.add_field("Status", status, code_format=True)
if task_name:
block.add_field("Task", task_name, code_format=True) if task_name else None
if subscribers:
block.add_field(
"Subscribers",
", ".join(subscribers),
)
attachment.add_block(block)
if log_url:
divider = {"type": "divider"}
attachment.blocks.append(divider)
log_button = {
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Logs"},
"url": log_url,
}
],
}
attachment.blocks.append(log_button)
message.add_attachment(attachment)
return message.attachments
# dags/utils/slack_message.py
class Block:
def __init__(self, block_type="section"):
self.block_type = block_type
self.fields = []
def add_field(self, title, value, code_format=False):
if code_format:
value = f"`{value}`"
self.fields.append({"type": "mrkdwn", "text": f"*{title}*\n{value}"})
def to_dict(self):
return {"type": self.block_type, "fields": self.fields}
class Attachment:
def __init__(self, color, fallback):
self.color = color
self.fallback = fallback
self.blocks = []
def add_block(self, block):
self.blocks.append(block.to_dict())
def to_dict(self):
return {"color": self.color, "fallback": self.fallback, "blocks": self.blocks}
class SlackMessage:
def __init__(self):
self.attachments = []
def add_attachment(self, attachment):
self.attachments.append(attachment.to_dict())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment