Skip to content

Instantly share code, notes, and snippets.

@brablc
Last active July 9, 2024 15:50
Show Gist options
  • Save brablc/b5a585341af60dc2d2cc417b3d0b5a4e to your computer and use it in GitHub Desktop.
Save brablc/b5a585341af60dc2d2cc417b3d0b5a4e to your computer and use it in GitHub Desktop.
Django management command for Celery monitoring
# See https://github.com/brablc/swarm-health-alerter/edit/main/README.md#alerting-for-any-service
# Place to adm/management/commands/celery_monitor.py (change adm to your main application in the code too)
import os
import logging
import json
from celery import Celery
from celery.events import EventReceiver
from django.core.management.base import BaseCommand
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "adm.settings")
app = Celery("adm")
app.config_from_object("django.conf:settings", namespace="CELERY")
class Command(BaseCommand):
help = "Monitor Celery events - detect repeatedly failing tasks."
status_file: str
failed_retries: int
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
self.task_info = {}
self.failed_tasks = {}
def add_arguments(self, parser):
parser.add_argument(
"--status-file",
type=str,
required=True,
help="Name of the file for writing error output",
)
parser.add_argument(
"--failed-retries",
type=int,
default=3,
help="Include in status file, when reaching number of failed retries",
)
def handle(self, *args, **kwargs):
self.status_file = kwargs.get("status_file")
self.failed_retries = kwargs.get("failed_retries")
self.logger.info(f"Status file: {self.status_file}")
self.capture_events()
def update_status(self):
status_data = {}
for task_name in self.failed_tasks:
count = self.failed_tasks[task_name]["count"]
if count < self.failed_retries:
continue
event = self.failed_tasks[task_name]["event"]
status_data[task_name] = {
"failed_retries": count,
"last_failure": {
"timestamp": event.get("timestamp", ""),
"exception": event.get("exception", ""),
"traceback": event.get("traceback", ""),
"args": event.get("args", []),
"kwargs": event.get("kwargs", {}),
},
}
if status_data:
with open(self.status_file, "w") as f:
json.dump(status_data, f, indent=2, default=str)
else:
if os.path.exists(self.status_file):
os.remove(self.status_file)
self.logger.info("All tasks healthy")
def process_event(self, event):
event_type = event["type"]
if "uuid" not in event:
return
task_uuid = event["uuid"]
if event_type == "task-received":
self.task_info[task_uuid] = event["name"]
return
if event_type not in ("task-succeeded", "task-failed"):
return
if task_uuid not in self.task_info:
self.logger.warning(f"Task {task_uuid} not found in task_info")
return
task_name = self.task_info[task_uuid]
if event_type == "task-succeeded":
if task_name in self.failed_tasks:
del self.failed_tasks[task_name]
self.logger.info(f"Task {task_name} succeeded")
self.update_status()
elif event_type == "task-failed":
if task_name in self.failed_tasks:
self.failed_tasks[task_name]["count"] += 1
self.logger.warning(f"Task {task_name} failed again ({self.failed_tasks[task_name]["count"]})")
else:
self.failed_tasks[task_name] = {"count": 1}
self.logger.error(f"Task {task_name} started failing")
self.failed_tasks[task_name]["event"] = event
self.update_status()
del self.task_info[task_uuid]
def capture_events(self):
with app.connection() as connection:
receiver = EventReceiver(
connection,
handlers={
"*": self.process_event,
},
)
receiver.capture(limit=None, timeout=None)
# I use a trick to get dockerize into each service without having to change images
# I distribute dockerize to all my swarm nodes to /var/lib/swarm
services:
postgres:
...
rabbitmq:
...
celery-monitor:
image: your-app-django:latest
tty: true
hostname: '{{.Node.Hostname}}'
networks:
- your-app-network
deploy:
replicas: 1
placement:
constraints:
- node.role == manager
environment:
STATUS_FILE: /tmp/celery-status.json
secrets:
- source: your-app.env
target: /app/.env
command: /swarm/bin/dockerize -wait tcp://rabbitmq:5672 sh -c 'python web/manage.py celery_monitor --status-file=$${STATUS_FILE} --failed-retries=3'
healthcheck:
test: ["CMD-SHELL", "test -f $${STATUS_FILE} || exit 0; cat $${STATUS_FILE}; exit 1"]
interval: 10s
timeout: 1s
retries: 9999
start_period: 10s
volumes:
- /var/lib/swarm:/swarm
celery-worker:
image: your-app-django:latest
tty: true
hostname: '{{.Node.Hostname}}'
networks:
- your-app-network
deploy:
replicas: 2
placement:
constraints:
- node.role == worker
update_config:
delay: 10s
secrets:
- source: your-app.env
target: /app/.env
command: /swarm/bin/dockerize -wait tcp://rabbitmq:5672 -wait tcp://postgres:5432 celery --workdir web -A adm worker --loglevel=info --events
volumes:
- /var/lib/swarm:/swarm
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment