Skip to content

Instantly share code, notes, and snippets.

@brizzbane
Created August 31, 2015 17:47
Show Gist options
  • Save brizzbane/07c1b2699d530d449a1c to your computer and use it in GitHub Desktop.
Save brizzbane/07c1b2699d530d449a1c to your computer and use it in GitHub Desktop.
__author__ = 'brizzbane'
#for some reason, 'hello world' is only displayed some of the time (i.e. doing python thisfile.py producer 5 times, 'hello world' may only be printed 3
import logging
from mtm.v2.core.log import configure_logging
configure_logging()
import sys
from kombu import Exchange, Queue
from kombu.mixins import ConsumerMixin
from kombu.utils import kwdict, reprcall
task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
Queue('midpri', task_exchange, routing_key='midpri'),
Queue('lopri', task_exchange, routing_key='lopri')]
logger = logging.getLogger('kombu.worker')
class Worker(ConsumerMixin):
def __init__(self, connection):
self.logger = logging.getLogger('kombu.worker')
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=task_queues,
accept=['pickle', 'json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
fun = body['fun']
args = body['args']
kwargs = body['kwargs']
self.logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwdict(kwargs))
except Exception as exc:
self.logger.error('task raised exception: %r', exc)
message.ack()
def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
from kombu.pools import producers
from kombu.common import maybe_declare
payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
priority_to_routing_key = {'high': 'hipri',
'mid': 'midpri',
'low': 'lopri'}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
producer.publish(payload, serializer='pickle', compression='bzip2', routing_key=routing_key)
def hello_task(who='world'):
logger.info('Hello %s' % who)
if __name__ == '__main__':
from kombu import Connection
if sys.argv[1] == 'produce':
logger.info('Connecting ...')
with Connection('amqp://guest:guest@localhost:5672//') as conn:
logger.info('Connected.')
logger.info('Sending tasks ...')
send_as_task(conn, fun=hello_task, args=('Kombu',), kwargs={}, priority='high')
elif sys.argv[1] == 'consume':
logger.info('Connecting ...')
with Connection('amqp://guest:guest@localhost:5672//') as conn:
logger.info('Connected.')
logger.info('Awaiting tasks ...')
try:
Worker(conn).run()
except KeyboardInterrupt:
logger.info('Consumer Exiting.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment