Skip to content

Instantly share code, notes, and snippets.

@miku
Last active July 30, 2021 05:35
Show Gist options
  • Save miku/5850686 to your computer and use it in GitHub Desktop.
Save miku/5850686 to your computer and use it in GitHub Desktop.
Kombu example
from __future__ import with_statement
from kombu.common import maybe_declare
from kombu.pools import producers
from queues import task_exchange
priority_to_routing_key = {'high': 'high',
'mid': 'mid',
'low': 'low'}
def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
producer.publish(payload,
serializer='pickle',
compression='bzip2',
exchange=task_exchange,
routing_key=routing_key)
if __name__ == '__main__':
from kombu import Connection
from tasks import hello_task
import random
connection = Connection('amqp://guest:guest@localhost:5672//')
for i in range(1000000):
prio = 'high' if random.random() > 0.3 else 'low'
send_as_task(connection, fun=hello_task, args=('Kombu-%s' % i, ),
kwargs={}, priority=prio)
#!/usr/bin/env python
# coding: utf-8
from kombu import Exchange, Queue
task_exchange = Exchange('tasks', type='direct')
task_queues = [
Queue('high-prio', task_exchange, routing_key='high'),
Queue('low-prio', task_exchange, routing_key='low'),
Queue('mid-prio', task_exchange, routing_key='mid'),
]
def hello_task(who="world"):
print("Hello %s" % (who, ))
from __future__ import with_statement
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu.utils import kwdict, reprcall
from queues import task_queues
logger = get_logger(__name__)
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=task_queues,
callbacks=[self.process_task])]
def process_task(self, body, message):
fun = body['fun']
args = body['args']
kwargs = body['kwargs']
logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwdict(kwargs))
except Exception, exc:
logger.error('task raised exception: %r', exc)
message.ack()
if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
setup_logging(loglevel='DEBUG')
with Connection('amqp://guest:guest@localhost:5672//') as conn:
try:
Worker(conn).run()
except KeyboardInterrupt:
print('bye bye')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment