Skip to content

Instantly share code, notes, and snippets.

@namoshizun
Created March 26, 2022 14:10
Show Gist options
  • Save namoshizun/27c8bef46ffac2e6015c3e23d1cd0d99 to your computer and use it in GitHub Desktop.
Save namoshizun/27c8bef46ffac2e6015c3e23d1cd0d99 to your computer and use it in GitHub Desktop.
import time
import random
from elasticapm import Client, instrument
from elasticapm.base import get_client as get_apm_client
from elasticapm.contrib.celery import register_exception_tracking, register_instrumentation
from elasticapm.traces import execution_context
from celery import Celery
from celery.canvas import group
app = Celery()
app.conf.broker_url = 'redis://:password@localhost:6379/0'
app.conf.result_backend = 'redis://:password@localhost:6379/1'
app.conf.result_expires = 30
if not get_apm_client():
instrument()
apm_client = Client(
service_name='demo',
service_url='http://localhost:8200',
span_compression_enabled=True,
span_compression_exact_match_max_duration="50ms",
span_compression_same_kind_max_duration="1ms",
)
register_exception_tracking(apm_client)
register_instrumentation(apm_client)
def _poll_task_result(result, interval=0.3, timeout=10):
timer = 0
while timer < timeout and not result.ready():
time.sleep(interval)
timer += interval
return result
@app.task(name='do_heavy_lifting')
def do_heavy_lifting():
print(f'Doing some expensive computation')
time.sleep(2 * random.random())
print('Done!')
return 42
@app.task
def submit_task_for_computation():
# do something before
# ...
# result = app.send_task('do_heavy_lifting')
result = app.send_task('do_heavy_lifting', headers={
'elasticapm': {
'parent_span_id': execution_context.get_transaction().id
}
})
_poll_task_result(result)
# consume the result
# ...
print(result.result)
@app.task
def run_complex_workflow():
signature = group(
submit_task_for_computation.s(),
submit_task_for_computation.s(),
)
signature.delay()
# celery -A app worker
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment