Skip to content

Instantly share code, notes, and snippets.

@zircote

zircote/monitor.py

Last active Mar 14, 2019
Embed
What would you like to do?
AWS/Celery Monitor
from boto.ec2.cloudwatch import CloudWatchConnection
from datetime import datetime
from main.settings import AWS_CREDENTIALS, CLOUDWATCH_NAMESPACE
def monitor(app):
cloudwatch = CloudWatchConnection(**AWS_CREDENTIALS)
state = app.events.State()
def get_task(event):
"""
:rtype: celery.events.state.Task
"""
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
return state.tasks.get(event['uuid'])
def on_task_succeeded(event):
task = get_task(event)
if task.name is None:
return
dimensions = {
'hostname': task.hostname,
'task': task.name,
}
dtime = datetime.fromtimestamp(task.runtime)
cloudwatch.put_metric_data(CLOUDWATCH_NAMESPACE, task.type,
(dtime.second * 1000) + (dtime.microsecond / 1000),
datetime.fromtimestamp(task.timestamp), 'Milliseconds', dimensions)
def on_task_received(event):
task = get_task(event)
if task.name is None:
return
dimensions = {
'hostname': task.hostname,
'task': task.name,
}
cloudwatch.put_metric_data(CLOUDWATCH_NAMESPACE, task.type, 1,
datetime.fromtimestamp(task.timestamp), 'Count', dimensions)
def on_task_started(event):
task = get_task(event)
if task.name is None:
return
dimensions = {
'hostname': task.hostname,
'task': task.name,
}
cloudwatch.put_metric_data(CLOUDWATCH_NAMESPACE, task.type, 1,
datetime.fromtimestamp(task.timestamp), 'Count', dimensions)
def on_task_failed(event):
task = get_task(event)
if task.name is None:
return
dimensions = {
'hostname': task.hostname,
'task': task.name,
}
cloudwatch.put_metric_data(CLOUDWATCH_NAMESPACE, task.type, 1,
datetime.fromtimestamp(task.timestamp), 'Count', dimensions)
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': on_task_succeeded,
'task-received': on_task_received,
'task-started': on_task_started,
'task-failed': on_task_failed
# '*': on_task_succeeded,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
from main.celery import app
monitor(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.