Skip to content

Instantly share code, notes, and snippets.

@zircote
Last active March 6, 2022 22:05
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save zircote/7834009 to your computer and use it in GitHub Desktop.
Save zircote/7834009 to your computer and use it in GitHub Desktop.
AWS/Celery Monitor
from boto.ec2.cloudwatch import CloudWatchConnection
from datetime import datetime
from main.settings import AWS_CREDENTIALS, CLOUDWATCH_NAMESPACE
def monitor(app):
cloudwatch = CloudWatchConnection(**AWS_CREDENTIALS)
state = app.events.State()
def get_task(event):
"""
:rtype: celery.events.state.Task
"""
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
return state.tasks.get(event['uuid'])
def on_task_succeeded(event):
task = get_task(event)
if task.name is None:
return
dimensions = {
'hostname': task.hostname,
'task': task.name,
}
dtime = datetime.fromtimestamp(task.runtime)
cloudwatch.put_metric_data(CLOUDWATCH_NAMESPACE, task.type,
(dtime.second * 1000) + (dtime.microsecond / 1000),
datetime.fromtimestamp(task.timestamp), 'Milliseconds', dimensions)
def on_task_received(event):
task = get_task(event)
if task.name is None:
return
dimensions = {
'hostname': task.hostname,
'task': task.name,
}
cloudwatch.put_metric_data(CLOUDWATCH_NAMESPACE, task.type, 1,
datetime.fromtimestamp(task.timestamp), 'Count', dimensions)
def on_task_started(event):
task = get_task(event)
if task.name is None:
return
dimensions = {
'hostname': task.hostname,
'task': task.name,
}
cloudwatch.put_metric_data(CLOUDWATCH_NAMESPACE, task.type, 1,
datetime.fromtimestamp(task.timestamp), 'Count', dimensions)
def on_task_failed(event):
task = get_task(event)
if task.name is None:
return
dimensions = {
'hostname': task.hostname,
'task': task.name,
}
cloudwatch.put_metric_data(CLOUDWATCH_NAMESPACE, task.type, 1,
datetime.fromtimestamp(task.timestamp), 'Count', dimensions)
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': on_task_succeeded,
'task-received': on_task_received,
'task-started': on_task_started,
'task-failed': on_task_failed
# '*': on_task_succeeded,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
from main.celery import app
monitor(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment