Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple python script to monitor celery queue processing speed in redis.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
celery_speed
~~~~~~~~~~~~
Simple tool to see how fast a celery queue in Redis is being consumed.
Usage:
celery_speed --redis-host REDIS_HOST --queue-name QUEUE_NAME
REDIS_HOST - (Optional, default is 'localhost') The redis host.
QUEUE_NAME - (Optional, default is 'celery') The name of the queue.
Example:
$ celery_speed
host: localhost, queue: celery
negative = queue decreasing, positive = queue increasing
1238 total, -15.3 items/sec
"""
import argparse
import redis
import sys
import time
def celery_speed(redis_connection, celery_queue_name):
"""Display the speed at which items in the celery queue are being consumed.
:param redis_connection: A connection to redis
:type redis_connection: redis.StrictRedis
:param celery_queue_name: Name of celery queue.
:type celery_queue_name: str or unicode
"""
last_value, last_time = None, None
while True:
try:
curr_value = redis_connection.llen(celery_queue_name)
curr_time = time.time()
if last_value and last_time:
speed = (
(curr_value - last_value) / float(curr_time - last_time)
)
sys.stdout.write(
'{} total, {:0.02f} items/sec\r'.format(curr_value, speed)
)
sys.stdout.flush()
last_value = curr_value
last_time = curr_time
time.sleep(1)
except KeyboardInterrupt:
print 'Shutdown'
sys.exit()
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Show speed at which celery processing queue items.'
)
parser.add_argument(
'--redis-host', dest='redis_host', default='localhost',
help='(Optional, default is localhost) The redis host'
)
parser.add_argument(
'--queue-name', dest='queue_name', default='celery',
help='(Optional, default is celery) The name of the celery queue.'
)
args = parser.parse_args()
print 'host: {}, queue: {}'.format(args.redis_host, args.queue_name)
print 'negative = queue decreasing, positive = queue increasing'
print
redis_host = (args.redis_host or 'localhost')
celery_queue_name = (args.queue_name or 'celery')
redis_connection = redis.StrictRedis(redis_host)
celery_speed(redis_connection, celery_queue_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment