Skip to content

Instantly share code, notes, and snippets.

@saml
Created March 19, 2018 17:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saml/11532534393096df3f8f8aa400ef35e4 to your computer and use it in GitHub Desktop.
Save saml/11532534393096df3f8f8aa400ef35e4 to your computer and use it in GitHub Desktop.
# -*- coding:utf-8 -*-
import logging
from kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerMixin
_log = logging.getLogger(__name__)
logging.basicConfig(level='DEBUG', format='%(asctime)s %(levelname)s %(name)s %(lineno)d %(process)d %(processName)s %(message)s')
class Worker(ConsumerMixin):
def __init__(self, connection, work_func, queue):
self.connection = connection
self.work_func = work_func
self.queue = queue
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[self.queue], callbacks=[self.do_work])]
def do_work(self, *args, **kwargs):
try:
self.work_func(*args, **kwargs)
except:
_log.exception('Unexpected exception')
def test_yolo():
queue = Queue('abcd', Exchange('abcd'), 'abcd')
with Connection('memory:///', transport_options={'polling_interval': 0}) as connection:
producer = connection.Producer()
producer.publish({'hello': 'world'}, exchange=queue.exchange, routing_key=queue.routing_key, declare=[queue])
consumer = Worker(connection=connection, work_func=lambda x: True, queue=queue)
def work(*args, **kwargs):
_log.info('work %s %s', args, kwargs)
consumer.should_stop = True
consumer.work_func = work
consumer.run()
@CavalcanteLucas
Copy link

CavalcanteLucas commented Jun 7, 2021

Thank you for sharing that!
Very helpful code..

I was having trouble to encapsulate the worker in a way that unit tests could run without me having to type CTRL+C.
Lines 36-38 were the missing piece.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment