Created
January 23, 2018 06:14
-
-
Save thulasi-ram/6f0a867e8303047a76cb0d2b56af8de4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from gevent import monkey | |
monkey.patch_all() | |
import logging | |
import time | |
from amqp import PreconditionFailed | |
from kombu import Exchange | |
from kombu import Queue | |
from kombu.entity import PERSISTENT_DELIVERY_MODE | |
from kombu.mixins import ConsumerProducerMixin | |
from exception import MaxRetriesExceeded, RejectToDeadLetterQueue, SkipAcknowledgement | |
from gevent.pool import Group | |
logger = logging.getLogger(__name__) | |
class AutoScale(object): | |
def __init__(self, callback, sizer, interval=10): | |
self.interval = interval | |
self.callback = callback | |
self.sizer = sizer | |
# thread = threading.Thread(target=self.run, args=()) | |
# thread.daemon = True | |
# thread.start() | |
def run(self): | |
""" Method that runs forever """ | |
pool = Group() | |
while True: | |
size = self.sizer.get() | |
diff = size - len(pool) | |
scale = self.grow if diff >= 0 else self.shrink | |
logger.info('Scaling pool by: %s', diff) | |
for i in range(1, abs(diff) + 1): | |
scale(pool=pool) | |
logger.info('Autoscale sleeping for %s', self.interval) | |
time.sleep(self.interval) | |
def grow(self, pool): | |
glet = pool.spawn(self.callback) | |
pool.add(glet) | |
def shrink(self, pool): | |
if len(pool): | |
glet = pool.greenlets.pop() | |
pool.discard(glet) | |
class Sizer(object): | |
def __init__(self, callback): | |
self.callback = callback | |
def get(self): | |
threshold = self.callback() | |
if not threshold: | |
size = 0 | |
elif threshold in range(1, 10): | |
size = 2 | |
elif threshold in range(10, 20): | |
size = 4 | |
elif threshold in range(20, 30): | |
size = 6 | |
else: | |
size = 9 | |
return size | |
class BaseRMQConsumerV2(ConsumerProducerMixin): | |
exchange_name = None | |
default_exchange_name = 'default-exchange' | |
declare_exchange_if_not_exists = False | |
priority_queue = False | |
priority_level = 10 | |
override_existing_queue = False # be careful of the after effects while using this | |
enable_delayed_retries = False | |
delay_queue_name = None | |
delay_in_seconds = 900 | |
max_retries = 3 | |
dead_letter_queue_name = None | |
default_dead_letter_queue_name = 'dead-letter-queue' | |
prefetch_count = 1 | |
auto_scale = False | |
auto_scale_class = AutoScale | |
def __init__(self, queue_name, routing_key, connection, config): | |
self.connection = connection | |
self.config = config | |
if not queue_name: | |
raise RuntimeError('Queue name is required') | |
self.queue_name = queue_name | |
self.queue_routing_key = routing_key or '#' | |
def process_message(self, body, message): | |
raise NotImplementedError("'{n}' needs to implement process_message(...)".format(n=self.__class__.__name__)) | |
def queue(self, **kwargs): | |
options = {'routing_key': self.queue_routing_key, 'durable': True} | |
if self.priority_queue: | |
options['max_priority'] = self.priority_level | |
if kwargs: | |
options.update(kwargs) | |
return Queue(self.queue_name, exchange=self.exchange(), **options) | |
def exchange(self, **kwargs): | |
name = self.exchange_name if self.exchange_name else self.default_exchange_name | |
options = {'passive': True} | |
if self.declare_exchange_if_not_exists: | |
options = {'type': 'topic', 'passive': False, 'durable': True} | |
if kwargs: | |
options.update(kwargs) | |
return Exchange(name, **options) | |
def dead_letter_queue(self, **kwargs): | |
name = self.dead_letter_queue_name if self.dead_letter_queue_name else self.default_dead_letter_queue_name | |
options = {'routing_key': 'dead.#', 'durable': True} | |
if kwargs: | |
options.update(kwargs) | |
return Queue(name, exchange=self.exchange(), **options) | |
def delay_queue(self, **kwargs): | |
name = self.delay_queue_name if self.delay_queue_name else '{q}-delay'.format(q=self.queue_name) | |
exchange = self.exchange() | |
options = {'routing_key': 'delay.{rk}'.format(rk=self.queue_routing_key), 'durable': True, | |
'queue_arguments': {'x-dead-letter-exchange': exchange.name, | |
"x-dead-letter-routing-key": self.queue_routing_key}} | |
if kwargs: | |
options.update(kwargs) | |
return Queue(name, exchange=exchange, **options) | |
def get_consumers(self, consumer, channel): | |
self._maybe_delete_queue(queue=self.queue()) | |
return [consumer(queues=[self.queue()], callbacks=[self._on_task], prefetch_count=self.prefetch_count)] | |
def publish(self, message, reject=False): | |
routing_key_prefix = 'dead' if reject else 'delay' | |
if routing_key_prefix == 'dead': | |
queue = self.dead_letter_queue() | |
delay = None | |
message.headers.pop('retries', None) # reset retry header if pushing to dead letter queue | |
if queue.name == self.queue_name: | |
# ideally to check if dead-letter queue is its own dead letter queue | |
# without this check will cause deadlock. | |
raise SkipAcknowledgement(original_exc=None) | |
elif routing_key_prefix == 'delay': | |
if not self.enable_delayed_retries: | |
# should never happen. Means somebody is messing with this. | |
raise RuntimeError('Routing to delay queue cannot happen if enable_delayed_retries is False') | |
queue = self.delay_queue() | |
delay = self.delay_in_seconds | |
self._maybe_delete_queue(queue) # instead rely on changing delay queue name and set an expiration for it. | |
else: | |
raise RuntimeError('Unknown routing key prefix: {rk}'.format(rk=routing_key_prefix)) | |
data = dict(body=message.body, | |
headers=message.headers, | |
exchange=queue.exchange, | |
declare=[queue], | |
routing_key='{p}.{k}'.format(p=routing_key_prefix, k=message.delivery_info['routing_key']), | |
expiration=delay, | |
retry=True, | |
delivery_mode=PERSISTENT_DELIVERY_MODE) | |
self.producer.publish(**data) | |
def _on_task(self, body, message): | |
logger.info('{m} with body: {b} received'.format(m=repr(message), b=body)) | |
try: | |
try: | |
self._can_retry(message=message) | |
self.process_message(body, message) | |
except (MaxRetriesExceeded, RejectToDeadLetterQueue) as e: | |
logger.info('{m} failed due to {e}'.format(m=repr(message), e=repr(e))) | |
self._add_to_error_trail(message=message, exc=e) | |
self.publish(message=message, reject=True) | |
except SkipAcknowledgement: | |
raise | |
except Exception as e: | |
logger.exception('{m} failed due to {e}'.format(m=repr(message), e=repr(e))) | |
self._add_to_error_trail(message=message, exc=e) | |
self.publish(message=message, reject=not self.enable_delayed_retries) | |
message.ack() | |
logger.info('{m} body: {b} acked'.format(m=repr(message), b=body)) | |
except SkipAcknowledgement as e: | |
# Don't raise. passing of this will let the message dangle in the queue | |
logger.exception('SkipAck Exception in {m} body: {b} due to {e}'.format(m=repr(message), b=body, | |
e=repr(e.original_exc))) | |
except Exception as e: | |
logger.exception('Exception in {m} body: {b} due to {e}'.format(m=repr(message), b=body, e=repr(e))) | |
# Currently raising since alerting system is supposed to highlight this. Without raise we will just keep | |
# consuming. Exception at this level deserved to be bought to the notice of devs/ops. | |
raise | |
def _can_retry(self, message): | |
try: | |
message.headers['retries'] += 1 | |
except KeyError: | |
message.headers['retries'] = 1 | |
if message.headers['retries'] > self.max_retries: | |
raise MaxRetriesExceeded(max_retires=self.max_retries) | |
def _add_to_error_trail(self, message, exc): | |
msg = 'Try {t}: due to {e}'.format(e=exc.message, t=message.headers['retries']) | |
try: | |
message.headers['error_trail'].append(msg) | |
except KeyError: | |
message.headers['error_trail'] = [msg] | |
def _maybe_delete_queue(self, queue): | |
""" Delete the queue if queue declaration parameters have changed. """ | |
try: | |
queue.maybe_bind(channel=self.connection.channel()) | |
queue.declare() | |
except PreconditionFailed as e: | |
if not self.override_existing_queue: | |
logger.exception(e) | |
raise RuntimeError('Precondition Failed usually means the queue is already declared with few params ' | |
'which have changed. Eg: routing_key, priority_level. ' | |
'Set override_existing_queue=True to override existing queue declaration. ' | |
'Original Error: {e}'.format(e=e.message)) | |
logger.warning('Deleting queue {q} due to {e}'.format(q=repr(queue), e=repr(e))) | |
queue.delete(if_empty=True) | |
def messages_in_queue(self): | |
queue = self.queue() | |
queue.maybe_bind(self.connection) | |
m = queue.queue_declare()[1] | |
return m | |
def run(self): | |
_run = super(BaseRMQConsumerV2, self).run | |
if not self.auto_scale: | |
_run() | |
else: | |
ascale = self.auto_scale_class(callback=_run, sizer=Sizer(callback=self.messages_in_queue)) | |
ascale.run() | |
=========== | |
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
gevent expandable queue consumer | |
""" | |
import logging | |
import sys | |
import time | |
from argparse import ArgumentParser | |
from kombu import Connection | |
from base_rmq_consumer_v2 import BaseRMQConsumerV2 | |
from configuration import Configuration | |
from worker_log_conf import setup_basic_config | |
logging.basicConfig(level=logging.DEBUG) | |
handler = logging.StreamHandler() | |
handler.setLevel(logging.DEBUG) | |
log = logging.getLogger(__name__) | |
class Worker(BaseRMQConsumerV2): | |
auto_scale = True | |
def process_message(self, body, message): | |
try: | |
log.info('starting request {b}'.format(b=body)) | |
# body = json.loads(body) | |
time.sleep(5) | |
# requests.get(url='http://127.0.0.1:8001/b2b/corporate/?query=test&idd={b}'.format(b=body['id'])) | |
log.info('ending request {b}\n'.format(b=body)) | |
except Exception as e: | |
log.error(e) | |
def main(): | |
parser = ArgumentParser() | |
parser.add_argument("-u", "--url", | |
help="rabbitmq connection url, e.g.: amqp://b2badmin:b2badmin@localhost/b2b", | |
default='amqp://b2badmin:b2badmin@localhost/b2b') | |
parser.add_argument("-b", "--b2b-base-url", | |
help="b2b server url, e.g.: http://localhost:8000/", | |
default='http://localhost:8000/') | |
parser.add_argument("-lf", "--log-file-pathname", | |
help="log file pathname, e.g.: /ebs1/logs/task_queue.log", | |
default='/ebs1/logs/task_queue.log') | |
args = parser.parse_args() | |
setup_basic_config(args.log_file_pathname) | |
config = Configuration(rabbitmq_url=args.url, | |
b2b_url_base=args.b2b_base_url, | |
async_timeout=600) | |
with Connection(hostname=config.rabbitmq_url) as conn: | |
worker = Worker(queue_name='test-queue', routing_key='test.#', connection=conn, config=config) | |
worker.run() | |
if __name__ == '__main__': | |
sys.exit(main() or 0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment