Skip to content

Instantly share code, notes, and snippets.

@thulasi-ram
Created January 23, 2018 06:14
Show Gist options
  • Save thulasi-ram/6f0a867e8303047a76cb0d2b56af8de4 to your computer and use it in GitHub Desktop.
Save thulasi-ram/6f0a867e8303047a76cb0d2b56af8de4 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from gevent import monkey
monkey.patch_all()
import logging
import time
from amqp import PreconditionFailed
from kombu import Exchange
from kombu import Queue
from kombu.entity import PERSISTENT_DELIVERY_MODE
from kombu.mixins import ConsumerProducerMixin
from exception import MaxRetriesExceeded, RejectToDeadLetterQueue, SkipAcknowledgement
from gevent.pool import Group
logger = logging.getLogger(__name__)
class AutoScale(object):
def __init__(self, callback, sizer, interval=10):
self.interval = interval
self.callback = callback
self.sizer = sizer
# thread = threading.Thread(target=self.run, args=())
# thread.daemon = True
# thread.start()
def run(self):
""" Method that runs forever """
pool = Group()
while True:
size = self.sizer.get()
diff = size - len(pool)
scale = self.grow if diff >= 0 else self.shrink
logger.info('Scaling pool by: %s', diff)
for i in range(1, abs(diff) + 1):
scale(pool=pool)
logger.info('Autoscale sleeping for %s', self.interval)
time.sleep(self.interval)
def grow(self, pool):
glet = pool.spawn(self.callback)
pool.add(glet)
def shrink(self, pool):
if len(pool):
glet = pool.greenlets.pop()
pool.discard(glet)
class Sizer(object):
def __init__(self, callback):
self.callback = callback
def get(self):
threshold = self.callback()
if not threshold:
size = 0
elif threshold in range(1, 10):
size = 2
elif threshold in range(10, 20):
size = 4
elif threshold in range(20, 30):
size = 6
else:
size = 9
return size
class BaseRMQConsumerV2(ConsumerProducerMixin):
exchange_name = None
default_exchange_name = 'default-exchange'
declare_exchange_if_not_exists = False
priority_queue = False
priority_level = 10
override_existing_queue = False # be careful of the after effects while using this
enable_delayed_retries = False
delay_queue_name = None
delay_in_seconds = 900
max_retries = 3
dead_letter_queue_name = None
default_dead_letter_queue_name = 'dead-letter-queue'
prefetch_count = 1
auto_scale = False
auto_scale_class = AutoScale
def __init__(self, queue_name, routing_key, connection, config):
self.connection = connection
self.config = config
if not queue_name:
raise RuntimeError('Queue name is required')
self.queue_name = queue_name
self.queue_routing_key = routing_key or '#'
def process_message(self, body, message):
raise NotImplementedError("'{n}' needs to implement process_message(...)".format(n=self.__class__.__name__))
def queue(self, **kwargs):
options = {'routing_key': self.queue_routing_key, 'durable': True}
if self.priority_queue:
options['max_priority'] = self.priority_level
if kwargs:
options.update(kwargs)
return Queue(self.queue_name, exchange=self.exchange(), **options)
def exchange(self, **kwargs):
name = self.exchange_name if self.exchange_name else self.default_exchange_name
options = {'passive': True}
if self.declare_exchange_if_not_exists:
options = {'type': 'topic', 'passive': False, 'durable': True}
if kwargs:
options.update(kwargs)
return Exchange(name, **options)
def dead_letter_queue(self, **kwargs):
name = self.dead_letter_queue_name if self.dead_letter_queue_name else self.default_dead_letter_queue_name
options = {'routing_key': 'dead.#', 'durable': True}
if kwargs:
options.update(kwargs)
return Queue(name, exchange=self.exchange(), **options)
def delay_queue(self, **kwargs):
name = self.delay_queue_name if self.delay_queue_name else '{q}-delay'.format(q=self.queue_name)
exchange = self.exchange()
options = {'routing_key': 'delay.{rk}'.format(rk=self.queue_routing_key), 'durable': True,
'queue_arguments': {'x-dead-letter-exchange': exchange.name,
"x-dead-letter-routing-key": self.queue_routing_key}}
if kwargs:
options.update(kwargs)
return Queue(name, exchange=exchange, **options)
def get_consumers(self, consumer, channel):
self._maybe_delete_queue(queue=self.queue())
return [consumer(queues=[self.queue()], callbacks=[self._on_task], prefetch_count=self.prefetch_count)]
def publish(self, message, reject=False):
routing_key_prefix = 'dead' if reject else 'delay'
if routing_key_prefix == 'dead':
queue = self.dead_letter_queue()
delay = None
message.headers.pop('retries', None) # reset retry header if pushing to dead letter queue
if queue.name == self.queue_name:
# ideally to check if dead-letter queue is its own dead letter queue
# without this check will cause deadlock.
raise SkipAcknowledgement(original_exc=None)
elif routing_key_prefix == 'delay':
if not self.enable_delayed_retries:
# should never happen. Means somebody is messing with this.
raise RuntimeError('Routing to delay queue cannot happen if enable_delayed_retries is False')
queue = self.delay_queue()
delay = self.delay_in_seconds
self._maybe_delete_queue(queue) # instead rely on changing delay queue name and set an expiration for it.
else:
raise RuntimeError('Unknown routing key prefix: {rk}'.format(rk=routing_key_prefix))
data = dict(body=message.body,
headers=message.headers,
exchange=queue.exchange,
declare=[queue],
routing_key='{p}.{k}'.format(p=routing_key_prefix, k=message.delivery_info['routing_key']),
expiration=delay,
retry=True,
delivery_mode=PERSISTENT_DELIVERY_MODE)
self.producer.publish(**data)
def _on_task(self, body, message):
logger.info('{m} with body: {b} received'.format(m=repr(message), b=body))
try:
try:
self._can_retry(message=message)
self.process_message(body, message)
except (MaxRetriesExceeded, RejectToDeadLetterQueue) as e:
logger.info('{m} failed due to {e}'.format(m=repr(message), e=repr(e)))
self._add_to_error_trail(message=message, exc=e)
self.publish(message=message, reject=True)
except SkipAcknowledgement:
raise
except Exception as e:
logger.exception('{m} failed due to {e}'.format(m=repr(message), e=repr(e)))
self._add_to_error_trail(message=message, exc=e)
self.publish(message=message, reject=not self.enable_delayed_retries)
message.ack()
logger.info('{m} body: {b} acked'.format(m=repr(message), b=body))
except SkipAcknowledgement as e:
# Don't raise. passing of this will let the message dangle in the queue
logger.exception('SkipAck Exception in {m} body: {b} due to {e}'.format(m=repr(message), b=body,
e=repr(e.original_exc)))
except Exception as e:
logger.exception('Exception in {m} body: {b} due to {e}'.format(m=repr(message), b=body, e=repr(e)))
# Currently raising since alerting system is supposed to highlight this. Without raise we will just keep
# consuming. Exception at this level deserved to be bought to the notice of devs/ops.
raise
def _can_retry(self, message):
try:
message.headers['retries'] += 1
except KeyError:
message.headers['retries'] = 1
if message.headers['retries'] > self.max_retries:
raise MaxRetriesExceeded(max_retires=self.max_retries)
def _add_to_error_trail(self, message, exc):
msg = 'Try {t}: due to {e}'.format(e=exc.message, t=message.headers['retries'])
try:
message.headers['error_trail'].append(msg)
except KeyError:
message.headers['error_trail'] = [msg]
def _maybe_delete_queue(self, queue):
""" Delete the queue if queue declaration parameters have changed. """
try:
queue.maybe_bind(channel=self.connection.channel())
queue.declare()
except PreconditionFailed as e:
if not self.override_existing_queue:
logger.exception(e)
raise RuntimeError('Precondition Failed usually means the queue is already declared with few params '
'which have changed. Eg: routing_key, priority_level. '
'Set override_existing_queue=True to override existing queue declaration. '
'Original Error: {e}'.format(e=e.message))
logger.warning('Deleting queue {q} due to {e}'.format(q=repr(queue), e=repr(e)))
queue.delete(if_empty=True)
def messages_in_queue(self):
queue = self.queue()
queue.maybe_bind(self.connection)
m = queue.queue_declare()[1]
return m
def run(self):
_run = super(BaseRMQConsumerV2, self).run
if not self.auto_scale:
_run()
else:
ascale = self.auto_scale_class(callback=_run, sizer=Sizer(callback=self.messages_in_queue))
ascale.run()
===========
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
gevent expandable queue consumer
"""
import logging
import sys
import time
from argparse import ArgumentParser
from kombu import Connection
from base_rmq_consumer_v2 import BaseRMQConsumerV2
from configuration import Configuration
from worker_log_conf import setup_basic_config
logging.basicConfig(level=logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
log = logging.getLogger(__name__)
class Worker(BaseRMQConsumerV2):
auto_scale = True
def process_message(self, body, message):
try:
log.info('starting request {b}'.format(b=body))
# body = json.loads(body)
time.sleep(5)
# requests.get(url='http://127.0.0.1:8001/b2b/corporate/?query=test&idd={b}'.format(b=body['id']))
log.info('ending request {b}\n'.format(b=body))
except Exception as e:
log.error(e)
def main():
parser = ArgumentParser()
parser.add_argument("-u", "--url",
help="rabbitmq connection url, e.g.: amqp://b2badmin:b2badmin@localhost/b2b",
default='amqp://b2badmin:b2badmin@localhost/b2b')
parser.add_argument("-b", "--b2b-base-url",
help="b2b server url, e.g.: http://localhost:8000/",
default='http://localhost:8000/')
parser.add_argument("-lf", "--log-file-pathname",
help="log file pathname, e.g.: /ebs1/logs/task_queue.log",
default='/ebs1/logs/task_queue.log')
args = parser.parse_args()
setup_basic_config(args.log_file_pathname)
config = Configuration(rabbitmq_url=args.url,
b2b_url_base=args.b2b_base_url,
async_timeout=600)
with Connection(hostname=config.rabbitmq_url) as conn:
worker = Worker(queue_name='test-queue', routing_key='test.#', connection=conn, config=config)
worker.run()
if __name__ == '__main__':
sys.exit(main() or 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment