Skip to content

Instantly share code, notes, and snippets.

@zackhillman
Created March 12, 2022 00:07
Show Gist options
  • Save zackhillman/653be4c3773cbfd9aa2da2857ea0bc1c to your computer and use it in GitHub Desktop.
Save zackhillman/653be4c3773cbfd9aa2da2857ea0bc1c to your computer and use it in GitHub Desktop.
import sentry_sdk
from airflow import DAG
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models import TaskInstance, DagRun, BaseOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sqlalchemy import exc
from sqlalchemy.orm import Session
SCOPE_DEFAULT_ARGS = frozenset(("product_area", "owner"))
SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date"))
SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number"))
SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
def add_tagging(task_instance: TaskInstance):
"""
Add customized tagging to TaskInstances.
"""
dag_run: DagRun = task_instance.dag_run
task: BaseOperator = task_instance.task
dag: DAG = task.dag
with sentry_sdk.configure_scope() as scope:
for tag_name in SCOPE_TASK_INSTANCE_TAGS:
attribute = getattr(task_instance, tag_name)
scope.set_tag(tag_name, attribute)
for tag_name in SCOPE_DAG_RUN_TAGS:
attribute = getattr(dag_run, tag_name)
scope.set_tag(tag_name, attribute)
for tag_name in SCOPE_DEFAULT_ARGS:
attribute = dag.default_args.get(tag_name)
scope.set_tag(tag_name, attribute)
scope.set_tag("operator", task.__class__.__name__)
@provide_session
def add_breadcrumbs(task_instance: TaskInstance, session: Session = None):
"""
Add customized breadcrumbs to TaskInstances.
"""
if session is None:
return
dr = task_instance.get_dagrun(session)
task_instances = dr.get_task_instances(
state={State.SUCCESS, State.FAILED},
session=session,
)
for ti in task_instances:
data = {}
for crumb_tag in SCOPE_CRUMBS:
data[crumb_tag] = getattr(ti, crumb_tag)
sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info")
@provide_session
def sentry_run_raw_task(task_instance: TaskInstance, *args, session: Session = None, **kwargs):
"""
Create a scope for tagging and breadcrumbs in TaskInstance._run_raw_task.
"""
# Avoid leaking tags by using push_scope.
with sentry_sdk.push_scope():
add_tagging(task_instance)
add_breadcrumbs(task_instance)
try:
original_run_raw_task(task_instance, *args, session=session, **kwargs)
except Exception as e:
sentry_sdk.capture_exception(e)
raise
class KlaviyoSentryHook(BaseHook):
"""
Wrap around the Sentry SDK.
"""
def __init__(self, sentry_conn_id="sentry_dsn"):
ignore_logger("airflow.task")
ignore_logger("airflow.jobs.backfill_job.BackfillJob")
integrations = [FlaskIntegration(), SqlalchemyIntegration()]
executor_name = conf.get("core", "EXECUTOR")
if executor_name == "CeleryExecutor":
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_celery = CeleryIntegration()
integrations.append(sentry_celery)
try:
conn = self.get_connection(sentry_conn_id)
sentry_sdk.init(
dsn=conn.host,
integrations=integrations,
)
except (AirflowException, exc.OperationalError, exc.ProgrammingError):
self.log.debug("Sentry defaulting to environment variable.")
sentry_sdk.init(integrations=integrations)
TaskInstance._run_raw_task = sentry_run_raw_task
TaskInstance._sentry_integration_ = True
if not getattr(TaskInstance, "_sentry_integration_", False):
original_run_raw_task = TaskInstance._run_raw_task
KlaviyoSentryHook()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment