Skip to content

Instantly share code, notes, and snippets.

@wtfzambo
Last active September 28, 2023 13:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wtfzambo/190d4fb1f3b00f8f9a012de269420350 to your computer and use it in GitHub Desktop.
Save wtfzambo/190d4fb1f3b00f8f9a012de269420350 to your computer and use it in GitHub Desktop.
Easily format slack messages for https://dlthub.com/ pipelines.
import datetime
from abc import ABC, abstractmethod
from typing import Any, Literal, Sequence, cast, overload
from dlt.common.pipeline import LoadInfo
from dlt.common.storages.load_storage import LoadJobInfo
from slack_table import ColumnSpec, TRow
class BaseDataFormatter(ABC):
_COL_SPECS: Sequence[ColumnSpec]
_formatted_data: Sequence[TRow]
_global_max_width = 50
_width_multiplier = 1.2
def __init__(self, data: LoadInfo):
if not getattr(self, "_COL_SPECS", None):
raise NotImplementedError(
f"_COL_SPECS must be set in the __init__ method of {self.__class__.__name__}"
)
self.data = data
@abstractmethod
def _format_data(self) -> None:
pass
def _update_max_col_width(self) -> None:
for col_spec in self._COL_SPECS:
max_width = col_spec.get("width", 0)
for row in self._formatted_data:
data_index = col_spec["data_index"]
value = row[data_index]
max_width = min(max(max_width, len(value)), self._global_max_width)
col_spec["width"] = int(max_width * self._width_multiplier)
def format_data(self) -> tuple[Sequence[ColumnSpec], Sequence[TRow]]:
self._format_data()
self._update_max_col_width()
return self._COL_SPECS, self._formatted_data
class JobFailedDataFormatter(BaseDataFormatter):
def __init__(self, data: LoadInfo):
# fmt: off
self._COL_SPECS: Sequence[ColumnSpec] = [
{"title": "Table", "data_index": "table_name", "base_path": "job_file_info"},
{"title": "Started on", "data_index": "created_at"},
{"title": "Message", "data_index": "failed_message"},
]
# fmt: on
super().__init__(data)
self._failed_jobs = self._get_failed_jobs()
def _get_failed_jobs(self) -> list[LoadJobInfo]:
return [
job
for package in self.data.load_packages
for job in package.jobs["failed_jobs"]
]
def _format_obj(self, obj: Any) -> str:
if isinstance(obj, datetime.datetime):
obj = obj.strftime("%Y-%m-%d %H:%M:%S")
if callable(obj):
obj = cast(str, obj())
return str(obj).replace("\n", " ")
def _get_nested_attr(self, obj: Any, attr_path: str) -> str:
attrs = attr_path.split(".")
for attr in attrs:
obj = getattr(obj, attr, None)
return self._format_obj(obj)
def _format_data(self) -> None:
formatted_data = []
for job in self._failed_jobs:
job_dict = {}
for col_spec in self._COL_SPECS:
base_path = col_spec.get("base_path")
data_index = col_spec["data_index"]
path = f"{base_path}.{data_index}" if base_path else data_index
job_dict[data_index] = self._get_nested_attr(job, path)
formatted_data.append(job_dict)
self._formatted_data = formatted_data
class SchemaChangesFormatter(BaseDataFormatter):
def __init__(self, data: LoadInfo):
self._COL_SPECS: Sequence[ColumnSpec] = [
{"title": "Table", "data_index": "table_name"},
{"title": "Source", "data_index": "schema_name"},
{"title": "Column", "data_index": "column"},
{"title": "Data type", "data_index": "data_type"},
]
super().__init__(data)
def _format_data(self) -> None:
formatted_data = []
for package in self.data.load_packages:
if not package.schema_update.items():
continue
for table_name, table in package.schema_update.items():
for i, (column_name, column) in enumerate(table["columns"].items()):
changes_dict = {}
changes_dict["table_name"] = table_name if i == 0 else ""
changes_dict["schema_name"] = package.schema_name if i == 0 else ""
changes_dict["column"] = column_name
changes_dict["data_type"] = column["data_type"]
formatted_data.append(changes_dict)
self._formatted_data = formatted_data
@overload
def get_data_formatter(
alert_type: Literal["failed_jobs"], data: LoadInfo
) -> JobFailedDataFormatter:
...
@overload
def get_data_formatter(
alert_type: Literal["schema_changes"], data: LoadInfo
) -> SchemaChangesFormatter:
...
def get_data_formatter(
alert_type: Literal["failed_jobs", "schema_changes"], data: LoadInfo
) -> JobFailedDataFormatter | SchemaChangesFormatter:
if alert_type == "failed_jobs":
return JobFailedDataFormatter(data)
elif alert_type == "schema_changes":
return SchemaChangesFormatter(data)
else:
raise ValueError("Unsupported alert type")
import os
import traceback
from typing import Any, Callable, Sequence
import dlt
from dlt.common.pipeline import LoadInfo
from dlt.common.runtime.slack import send_slack_message
from loguru import logger
from formatters import get_data_formatter
from slack_table import slack_table
CI_ENVIRONMENT = os.getenv('CI_ENVIRONMENT')
def _check_failed_jobs(load_info: LoadInfo) -> None:
if not load_info.has_failed_jobs:
return
formatter = get_data_formatter("failed_jobs", load_info)
formatted_data = formatter.format_data()
slack_message = slack_table(
"⚠️ Warning - the following jobs have failed",
f"Dataset ⟶ `{load_info.pipeline.dataset_name}`",
*formatted_data,
)
logger.warning("\n" + slack_message.replace("```", "\n"))
send_slack_message(
load_info.pipeline.runtime_config.slack_incoming_hook, slack_message
)
def _check_schema_changes(load_info: LoadInfo) -> None:
if load_info.pipeline.first_run:
return
formatter = get_data_formatter("schema_changes", load_info)
formatted_data = formatter.format_data()
if not formatted_data[1]:
return
slack_message = slack_table(
"🧩 Info - schema changes (new columns/tables) detected",
f"Dataset ⟶ `{load_info.pipeline.dataset_name}`",
*formatted_data,
)
logger.warning("\n" + slack_message.replace("```", "\n"))
send_slack_message(
load_info.pipeline.runtime_config.slack_incoming_hook, slack_message
)
def _alert_failure(exception: Exception, traceback: str) -> None:
slack_message = (
"<!here>\n\n"
+ f"*🔥 Error - pipeline crashed in `{CI_ENVIRONMENT}`*\n\n"
+ f"\tError message ⟶ `{repr(exception)}`\n\n"
+ f"```{traceback}```"
)
send_slack_message(dlt.config["runtime.slack_incoming_hook"], slack_message)
def pipeline_sentry(fn: Callable[[], Sequence[LoadInfo]]) -> Callable[[], None]:
def wrapper(*args: Any, **kwargs: Any) -> None:
try:
logger.info(f"Starting pipeline in {fn.__module__}")
pipeline_load_infos = fn(*args, **kwargs)
for load_info in pipeline_load_infos:
logger.success(load_info)
_check_failed_jobs(load_info)
_check_schema_changes(load_info)
except Exception as e:
logger.exception(e)
tb = traceback.format_exc()
_alert_failure(e, tb)
raise
return wrapper
# inspired by https://github.com/idw111/slack-table
from typing import Literal, NotRequired, Sequence, TypedDict, cast
class ColumnSpec(TypedDict):
title: str
data_index: str
base_path: NotRequired[str]
width: NotRequired[int]
align: NotRequired[str]
TRow = dict[str, str]
def _pad_left(text: str = "", max_length: int = 13) -> str:
return text[:max_length].rjust(max_length)
def _pad_right(text: str = "", max_length: int = 13) -> str:
return text[:max_length].ljust(max_length)
def _fill_dash(length: int) -> str:
return "-" * length
def _get_lines(columns: Sequence[ColumnSpec]) -> str:
total_length = sum([col.get("width", 10) for col in columns]) + len(columns) - 1
return _fill_dash(total_length)
def _get_col_value(col: ColumnSpec, row: TRow) -> str:
align = col.get("align", "left")
width = col.get("width", 10)
data_index = col["data_index"]
pad = _pad_left if align == "right" else _pad_right
value = str(row.get(data_index, ""))
return pad(value, width)
def _get_row(columns: Sequence[ColumnSpec], row: TRow | Literal["-"]) -> str:
if row == "-":
return _get_lines(columns)
return " ".join([_get_col_value(column, cast(TRow, row)) for column in columns])
def _get_header_col(col: ColumnSpec) -> str:
align = col.get("align", "left")
width = col.get("width", 10)
title = col.get("title", "")
pad = _pad_left if align == "right" else _pad_right
return pad(title, width)
def _get_header_row(columns: Sequence[ColumnSpec]) -> str:
return " ".join([_get_header_col(column) for column in columns])
# WARN: currently breaks formatting in slack if message longer than 4k characters
def slack_table(
title: str = "",
subtitle: str = "",
columns: Sequence[ColumnSpec] = [],
data_source: Sequence[TRow] = [],
) -> str:
table_data = (
[_get_header_row(columns)] + # noqa
[_get_row(columns, '-')] + # noqa
[_get_row(columns, row) for row in data_source]
)
return (
f"<!here>\n\n*{title}*\n\n"
+ f"\t{subtitle}\n\n```"
+ "\n".join(table_data)
+ "\n```\n"
)
@wtfzambo
Copy link
Author

Usage

(Install loguru for no hassle out of the box pretty logging)

from dlt.common.pipeline import LoadInfo
from typing import Sequence

from monitoring import pipeline_sentry


@pipeline_sentry
def run_pipeline() -> Sequence[LoadInfo]:
    ...
    load_info = pipeline.run(...)
    return load_info,  # <- Notice the comma. 
    # As is, the decorated function must return a sequence of `LoadInfo` for pipeline_sentry to work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment