Skip to content

Instantly share code, notes, and snippets.

@thulasi-ram
Created October 13, 2017 22:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thulasi-ram/0b39152456c957680bf9876efa979971 to your computer and use it in GitHub Desktop.
Save thulasi-ram/0b39152456c957680bf9876efa979971 to your computer and use it in GitHub Desktop.
A worker script that resizes itself based on number of messages in rabbitmq. This is based on Gevent. Make sure you profile your throughput before settling on a logic with size.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
gevent expandable queue consumer
"""
import sys
from gevent import monkey
monkey.patch_all()
import json
import requests
from gevent.pool import Group
from kombu import Exchange, Queue, Connection
from kombu.mixins import ConsumerMixin
mc = sys.maxsize # assigning to max possible integer
def on_queue_declare(*args):
global mc
mc = min(args[1], mc)
class Worker(ConsumerMixin):
exchnge = Exchange("test-exchange", type='topic', durable=True)
task_queue = Queue("test-queue", exchange=exchnge, routing_key="test.#", durable=True, on_declared=on_queue_declare)
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[self.task_queue], callbacks=[self.on_task], prefetch_count=1)]
def on_task(self, body, message):
body = json.loads(body)
# print '\nstarting request {b}'.format(b=body)
requests.get(url='http://127.0.0.1:8000/?query=test&idd={b}'.format(b=body['id']))
# print 'ending request {b}\n'.format(b=body)
message.ack()
self.should_stop = True
if __name__ == '__main__':
global mc
with Connection(hostname='amqp://admin:admin@localhost') as conn:
def g_worker():
worker = Worker(connection=conn)
worker.run()
while True:
size = int(str(mc)[:1])
group = Group()
[group.spawn(g_worker) for g in range(0, size)]
print ('spawning {n} greenlets'.format(n=len(group)))
res = group.join()
print ('ending mc: {mc}'.format(mc=mc))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment