Skip to content

Instantly share code, notes, and snippets.

@abdulmateen59
Created July 13, 2022 14:43
Show Gist options
  • Save abdulmateen59/63d8190e3bd4a062bce77373ab0a0549 to your computer and use it in GitHub Desktop.
Save abdulmateen59/63d8190e3bd4a062bce77373ab0a0549 to your computer and use it in GitHub Desktop.
Kombu(rabbitmq) consumer in separate thread
import _thread
import threading
from abc import ABC
from abc import abstractmethod
from time import sleep
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
from kombu.utils.debug import setup_logging
from loguru import logger
from pydantic import BaseModel, Field
setup_logging(loglevel='INFO', loggers=[''])
class QueueBinding(BaseModel):
queue: str = Field(description='Queue name')
routing_key: str = Field(description='Routing key')
exchange: str = Field(description='Exchange name')
exchange_type: str = Field(default='topic', description='Exchange type, default is topic, '
'others are direct | fanout | headers')
is_exchange_durable: bool = Field(default=True, description='Survive server restarts, default is True')
class Worker(ConsumerMixin, threading.Thread, ABC):
"""
Rabbitmq worker interface
author: Abdul
"""
def __init__(self, connection: Connection):
self.connection = connection
self._queues: list[Queue] = []
super(Worker, self).__init__(daemon=True)
@property
def queues(self) -> list[Queue]:
return self._queues
@queues.setter
def queues(self, queues: QueueBinding | list[QueueBinding]) -> None:
if isinstance(queues, list):
for item in queues:
self._queues.append(Queue(item.queue,
Exchange(item.exchange, type=item.exchange_type,
durable=item.is_exchange_durable),
routing_key=item.routing_key))
else:
self._queues = [Queue(queues.queue,
Exchange(queues.exchange, type=queues.exchange_type,
durable=queues.is_exchange_durable),
routing_key=queues.routing_key)]
def get_consumers(self, Consumer, channel):
"""
Returns a list of consumers with the channel, callbacks and queues specified.
"""
try:
if self._queues:
return [Consumer(queues=self.queues,
accept=['json', 'yaml'],
callbacks=[self.callback],
prefetch_count=1)]
else:
raise RuntimeError('No Queue defined!!!')
except Exception as e:
if not self.queues:
print('No Queue defined!!!')
logger.info(e)
_thread.interrupt_main()
@abstractmethod
def process_task(self, body) -> None:
raise NotImplementedError
def callback(self, body, message) -> None:
self.process_task(body)
message.ack()
def runner(self, in_thread: bool = False) -> None:
"""
Worker runner, pass in_thread=True to run in a thread and false to run in main thread
"""
if in_thread:
self.start()
else:
self.run()
class RandomTaskWorker(Worker):
"""
Example Worker
"""
def process_task(self, body):
print(10 * "*", self.name, '-----', body, 10 * "*")
sleep(1)
if __name__ == '__main__':
# One consumer consuming different queues
with Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost') as conn:
task = RandomTaskWorker(conn)
task.queues = [QueueBinding(queue='Task-1-Queue', exchange='Task-1', routing_key='T1'),
QueueBinding(queue='Task-2-Queue', exchange='Task-2', routing_key='T2')]
task.runner(in_thread=True)
# Separate consumers for each queue
with Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost') as conn:
task_1_worker = RandomTaskWorker(conn)
task_1_worker.queues = QueueBinding(queue='Task-1-Queue', exchange='Task-1', routing_key='T1')
task_1_worker.runner(in_thread=True)
with Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost') as conn:
task_2_worker = RandomTaskWorker(conn)
task_2_worker.queues = QueueBinding(queue='Task-2-Queue', exchange='Task-2', routing_key='T2')
task_2_worker.runner(in_thread=True)
# Main Thread
while True:
print(10 * "*", threading.current_thread().name, '-----', 'Hey, I am alive', 10 * "*")
sleep(5)
from time import sleep
from urllib3 import Retry
from kombu import Connection, Exchange
from kombu.pools import producers
def send_as_task(connection, exchange, routing_key, data):
with producers[connection].acquire(block=True) as producer:
producer.publish(data,
serializer='json',
exchange=exchange,
declare=[exchange],
routing_key=routing_key,
Retry=True)
if __name__ == '__main__':
connection = Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost')
while True:
exchange = Exchange('Task-1', type='topic')
send_as_task(connection, exchange=exchange, routing_key='T1', data="{me: too}")
print('T-1 Published...')
sleep(2)
from time import sleep
from kombu import Connection, Exchange
from kombu.pools import producers
def send_as_task(connection, exchange, routing_key, data):
with producers[connection].acquire(block=True) as producer:
producer.publish(data,
serializer='json',
exchange=exchange,
declare=[exchange],
routing_key=routing_key)
if __name__ == '__main__':
connection = Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost')
while True:
exchange = Exchange('Task-2', type='topic')
send_as_task(connection, exchange=exchange, routing_key='T2', data="{me: three}")
print('T-2 Published...')
sleep(3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment