Skip to content

Instantly share code, notes, and snippets.

@alanhamlett
Last active October 22, 2021 21:18
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save alanhamlett/dc8cdd4721ea63053f14 to your computer and use it in GitHub Desktop.
Save alanhamlett/dc8cdd4721ea63053f14 to your computer and use it in GitHub Desktop.
send an error email when a Celery worker raises an unhandled exception
# -*- coding: utf-8 -*-
"""
wakatime.amqp
~~~~~~~~~~~~~
Setup for Celery distributed task queue.
"""
import socket
import traceback
import uuid
from celery import Celery
from celery.utils.log import get_task_logger
from functools import wraps
from kombu.serialization import registry
try:
from celery import setup_security
except ImportError:
from celery.security import setup_security
from flask import json
from wakatime import app, emailer
from wakatime.compat import u
class RequestMock(object):
id = None
name = None
def __init__(self, id=None, name=None):
if id:
self.id = id
if name:
self.name = name
class TaskMock(object):
fn = None
id = None
name = None
args = ()
kwargs = {}
def __init__(self, *args, **kwargs):
self.id = u(uuid.uuid4())
if len(args) == 1 and len(kwargs) == 0:
self.fn = args[0]
self.name = args[0].__name__
else:
self.args = args
self.kwargs = kwargs
def delay(self, *args, **kwargs):
result = ResultMock(id=self.id, name=self.name)
args, kwargs = self._jsonify_args(*args, **kwargs)
if self.kwargs.get('bind'):
result.result = self.fn(self, *args, **kwargs)
else:
result.result = self.fn(*args, **kwargs)
return result
def apply_async(self, args=None, kwargs=None, *meta_args, **meta_kwargs):
if not isinstance(args, list):
args = []
if not isinstance(kwargs, dict):
kwargs = {}
return self.delay(*args, **kwargs)
def _jsonify_args(self, *args, **kwargs):
for key, val in kwargs.items():
kwargs[key] = json.loads(json.dumps(val))
args = tuple([json.loads(json.dumps(arg)) for arg in args])
return args, kwargs
@property
def request(self):
return RequestMock(id=self.id, name=self.name)
def __call__(self, *args, **kwargs):
if not self.fn:
task = TaskMock(args[0])
task.args = self.args
task.kwargs = self.kwargs
return task
else:
result = ResultMock(self.id)
result.result = self.fn(*args, **kwargs)
return result
class ResultMock(object):
id = None
name = None
result = None
def __init__(self, id=None, name=None):
if id:
self.id = id
if name:
self.name = name
def ready(self):
return True
def failed(self):
return False
def forget(self):
return
def get(self, *args, **kwargs):
return self.result
def wait(self, *args, **kwargs):
return self.result
@property
def info(self):
return self.result
class CeleryMock(object):
def __init__(self):
self.task = TaskMock
# Use our custom serializer from wakatime.json
registry.register('json', json.dumps, json.loads,
content_type='application/json',
content_encoding='utf-8')
def make_celery(app):
if app.config['MOCK_CELERY']:
celery = CeleryMock()
else:
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
if app.config['BROKER_USE_SSL']:
setup_security()
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
def email_if_fails(fn):
@wraps(fn)
def decorated(*args, **kwargs):
try:
return fn(*args, **kwargs)
except:
#db.session.rollback()
if not app.config['DEBUG']:
try:
fnName = fn.func_name
except AttributeError:
fnName = fn.__name__
send_error_email(fnName, args, kwargs, socket.gethostname(),
traceback.format_exc())
raise
return decorated
def send_error_email(fnName, args, kwargs, host, formatted_exc):
formatted_exc = formatted_exc.strip()
template = 'noop'
contents = u('Task: {fnName}\nArgs: {args}\nKwargs: {kwargs}\n' + \
'Host: {host}\nError: {error}')
template_vars = {
'contents': contents.format(
fnName=fnName,
args=args,
kwargs=kwargs,
host=host,
error=formatted_exc,
),
}
short_exc = formatted_exc.rsplit('\n')[-1]
subject = '[celery-error] {host} {fnName} {short_exc}'.format(
host=host,
fnName=fnName,
short_exc=short_exc,
)
emailer.send_email(subject=subject, to=app.config['ADMIN_EMAIL'],
template=template, template_vars=template_vars)
celery = make_celery(app)
logger = get_task_logger(__name__)
# -*- coding: utf-8 -*-
"""
wakatime.tasks
~~~~~~~~~~~~~~
Background celery worker tasks.
"""
from wakatime.amqp import celery, email_if_fails
@celery.task
@email_if_fails
def my_task():
# do some background task
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment