Created
October 13, 2017 22:41
-
-
Save thulasi-ram/0b39152456c957680bf9876efa979971 to your computer and use it in GitHub Desktop.
A worker script that resizes itself based on number of messages in rabbitmq. This is based on Gevent. Make sure you profile your throughput before settling on a logic with size.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
gevent expandable queue consumer | |
""" | |
import sys | |
from gevent import monkey | |
monkey.patch_all() | |
import json | |
import requests | |
from gevent.pool import Group | |
from kombu import Exchange, Queue, Connection | |
from kombu.mixins import ConsumerMixin | |
mc = sys.maxsize # assigning to max possible integer | |
def on_queue_declare(*args): | |
global mc | |
mc = min(args[1], mc) | |
class Worker(ConsumerMixin): | |
exchnge = Exchange("test-exchange", type='topic', durable=True) | |
task_queue = Queue("test-queue", exchange=exchnge, routing_key="test.#", durable=True, on_declared=on_queue_declare) | |
def __init__(self, connection): | |
self.connection = connection | |
def get_consumers(self, Consumer, channel): | |
return [Consumer(queues=[self.task_queue], callbacks=[self.on_task], prefetch_count=1)] | |
def on_task(self, body, message): | |
body = json.loads(body) | |
# print '\nstarting request {b}'.format(b=body) | |
requests.get(url='http://127.0.0.1:8000/?query=test&idd={b}'.format(b=body['id'])) | |
# print 'ending request {b}\n'.format(b=body) | |
message.ack() | |
self.should_stop = True | |
if __name__ == '__main__': | |
global mc | |
with Connection(hostname='amqp://admin:admin@localhost') as conn: | |
def g_worker(): | |
worker = Worker(connection=conn) | |
worker.run() | |
while True: | |
size = int(str(mc)[:1]) | |
group = Group() | |
[group.spawn(g_worker) for g in range(0, size)] | |
print ('spawning {n} greenlets'.format(n=len(group))) | |
res = group.join() | |
print ('ending mc: {mc}'.format(mc=mc)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment