Created
March 12, 2022 00:07
-
-
Save zackhillman/653be4c3773cbfd9aa2da2857ea0bc1c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sentry_sdk | |
from airflow import DAG | |
from airflow.configuration import conf | |
from airflow.exceptions import AirflowException | |
from airflow.hooks.base import BaseHook | |
from airflow.models import TaskInstance, DagRun, BaseOperator | |
from airflow.utils.db import provide_session | |
from airflow.utils.state import State | |
from sentry_sdk.integrations.flask import FlaskIntegration | |
from sentry_sdk.integrations.logging import ignore_logger | |
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration | |
from sqlalchemy import exc | |
from sqlalchemy.orm import Session | |
SCOPE_DEFAULT_ARGS = frozenset(("product_area", "owner")) | |
SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date")) | |
SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number")) | |
SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration")) | |
def add_tagging(task_instance: TaskInstance): | |
""" | |
Add customized tagging to TaskInstances. | |
""" | |
dag_run: DagRun = task_instance.dag_run | |
task: BaseOperator = task_instance.task | |
dag: DAG = task.dag | |
with sentry_sdk.configure_scope() as scope: | |
for tag_name in SCOPE_TASK_INSTANCE_TAGS: | |
attribute = getattr(task_instance, tag_name) | |
scope.set_tag(tag_name, attribute) | |
for tag_name in SCOPE_DAG_RUN_TAGS: | |
attribute = getattr(dag_run, tag_name) | |
scope.set_tag(tag_name, attribute) | |
for tag_name in SCOPE_DEFAULT_ARGS: | |
attribute = dag.default_args.get(tag_name) | |
scope.set_tag(tag_name, attribute) | |
scope.set_tag("operator", task.__class__.__name__) | |
@provide_session | |
def add_breadcrumbs(task_instance: TaskInstance, session: Session = None): | |
""" | |
Add customized breadcrumbs to TaskInstances. | |
""" | |
if session is None: | |
return | |
dr = task_instance.get_dagrun(session) | |
task_instances = dr.get_task_instances( | |
state={State.SUCCESS, State.FAILED}, | |
session=session, | |
) | |
for ti in task_instances: | |
data = {} | |
for crumb_tag in SCOPE_CRUMBS: | |
data[crumb_tag] = getattr(ti, crumb_tag) | |
sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info") | |
@provide_session | |
def sentry_run_raw_task(task_instance: TaskInstance, *args, session: Session = None, **kwargs): | |
""" | |
Create a scope for tagging and breadcrumbs in TaskInstance._run_raw_task. | |
""" | |
# Avoid leaking tags by using push_scope. | |
with sentry_sdk.push_scope(): | |
add_tagging(task_instance) | |
add_breadcrumbs(task_instance) | |
try: | |
original_run_raw_task(task_instance, *args, session=session, **kwargs) | |
except Exception as e: | |
sentry_sdk.capture_exception(e) | |
raise | |
class KlaviyoSentryHook(BaseHook): | |
""" | |
Wrap around the Sentry SDK. | |
""" | |
def __init__(self, sentry_conn_id="sentry_dsn"): | |
ignore_logger("airflow.task") | |
ignore_logger("airflow.jobs.backfill_job.BackfillJob") | |
integrations = [FlaskIntegration(), SqlalchemyIntegration()] | |
executor_name = conf.get("core", "EXECUTOR") | |
if executor_name == "CeleryExecutor": | |
from sentry_sdk.integrations.celery import CeleryIntegration | |
sentry_celery = CeleryIntegration() | |
integrations.append(sentry_celery) | |
try: | |
conn = self.get_connection(sentry_conn_id) | |
sentry_sdk.init( | |
dsn=conn.host, | |
integrations=integrations, | |
) | |
except (AirflowException, exc.OperationalError, exc.ProgrammingError): | |
self.log.debug("Sentry defaulting to environment variable.") | |
sentry_sdk.init(integrations=integrations) | |
TaskInstance._run_raw_task = sentry_run_raw_task | |
TaskInstance._sentry_integration_ = True | |
if not getattr(TaskInstance, "_sentry_integration_", False): | |
original_run_raw_task = TaskInstance._run_raw_task | |
KlaviyoSentryHook() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment