Skip to content

Instantly share code, notes, and snippets.

@emmettbutler
Last active March 22, 2023 17:40
Show Gist options
  • Save emmettbutler/49c1e8d26853b00c6aeb5c87e3aadc2b to your computer and use it in GitHub Desktop.
Save emmettbutler/49c1e8d26853b00c6aeb5c87e3aadc2b to your computer and use it in GitHub Desktop.
A minimal example of how to use dd-trace-py v1.9.3 to trace celery.beat and redbeat scheduling functionality
import logging
import time
import celery
from ddtrace import Pin
from ddtrace import config
# this should be omitted if running celery under ddtrace-run
import ddtrace.bootstrap.sitecustomize # noqa
from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.constants import SPAN_MEASURED_KEY
from ddtrace.contrib import trace_utils
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.vendor import wrapt
try:
import redbeat
except ImportError:
redbeat = None
log = logging.getLogger()
REDIS_URL = "redis://127.0.0.1:{port}".format(port=6379)
BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0)
BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1)
app = celery.Celery(broker=BROKER_URL, backend=BACKEND_URL)
app.conf.beat_schedule = {"do-it": {"task": "my_celery_test.mytask", "schedule": 5, "args": (time.time(),)}}
# custom instrumentation for celery.beat and redbeat
app.conf.broker_connection_max_retries = 3
def _traced_beat_function(integration_config, fn_name, scheduler_module_name="celery.beat", resource_fn=None):
def _traced_beat_inner(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return func(*args, **kwargs)
with pin.tracer.trace(
"{}.{}".format(scheduler_module_name, fn_name),
span_type=SpanTypes.WORKER,
service=trace_utils.ext_service(pin, integration_config),
) as span:
if resource_fn:
span.resource = resource_fn(args)
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
rate = config.celery.get_analytics_sample_rate()
if rate is not None:
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
span.set_tag(SPAN_MEASURED_KEY)
return func(*args, **kwargs)
return _traced_beat_inner
wrapt.wrap_function_wrapper(
"celery.beat",
"Scheduler.apply_entry",
_traced_beat_function(config.celery, "apply_entry", resource_fn=lambda args: args[0].name),
)
wrapt.wrap_function_wrapper(
"celery.beat",
"Scheduler.tick",
_traced_beat_function(
config.celery,
"tick",
),
)
Pin().onto(celery.beat.Scheduler)
if redbeat:
scheduler_module_name = "redbeat.schedulers"
wrapt.wrap_function_wrapper(
"redbeat.schedulers",
"RedBeatScheduler.maybe_due",
_traced_beat_function(
config.celery,
"maybe_due",
scheduler_module_name=scheduler_module_name,
resource_fn=lambda args: args[0].name,
),
)
wrapt.wrap_function_wrapper(
"redbeat.schedulers",
"RedBeatScheduler.tick",
_traced_beat_function(
config.celery,
"tick",
scheduler_module_name=scheduler_module_name,
),
)
Pin().onto(redbeat.schedulers.RedBeatScheduler)
@app.task
def mytask(n):
return f"foobar {n}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment