Created
July 13, 2022 14:43
-
-
Save abdulmateen59/63d8190e3bd4a062bce77373ab0a0549 to your computer and use it in GitHub Desktop.
Kombu(rabbitmq) consumer in separate thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import _thread | |
import threading | |
from abc import ABC | |
from abc import abstractmethod | |
from time import sleep | |
from kombu import Connection, Exchange, Queue | |
from kombu.mixins import ConsumerMixin | |
from kombu.utils.debug import setup_logging | |
from loguru import logger | |
from pydantic import BaseModel, Field | |
setup_logging(loglevel='INFO', loggers=['']) | |
class QueueBinding(BaseModel): | |
queue: str = Field(description='Queue name') | |
routing_key: str = Field(description='Routing key') | |
exchange: str = Field(description='Exchange name') | |
exchange_type: str = Field(default='topic', description='Exchange type, default is topic, ' | |
'others are direct | fanout | headers') | |
is_exchange_durable: bool = Field(default=True, description='Survive server restarts, default is True') | |
class Worker(ConsumerMixin, threading.Thread, ABC): | |
""" | |
Rabbitmq worker interface | |
author: Abdul | |
""" | |
def __init__(self, connection: Connection): | |
self.connection = connection | |
self._queues: list[Queue] = [] | |
super(Worker, self).__init__(daemon=True) | |
@property | |
def queues(self) -> list[Queue]: | |
return self._queues | |
@queues.setter | |
def queues(self, queues: QueueBinding | list[QueueBinding]) -> None: | |
if isinstance(queues, list): | |
for item in queues: | |
self._queues.append(Queue(item.queue, | |
Exchange(item.exchange, type=item.exchange_type, | |
durable=item.is_exchange_durable), | |
routing_key=item.routing_key)) | |
else: | |
self._queues = [Queue(queues.queue, | |
Exchange(queues.exchange, type=queues.exchange_type, | |
durable=queues.is_exchange_durable), | |
routing_key=queues.routing_key)] | |
def get_consumers(self, Consumer, channel): | |
""" | |
Returns a list of consumers with the channel, callbacks and queues specified. | |
""" | |
try: | |
if self._queues: | |
return [Consumer(queues=self.queues, | |
accept=['json', 'yaml'], | |
callbacks=[self.callback], | |
prefetch_count=1)] | |
else: | |
raise RuntimeError('No Queue defined!!!') | |
except Exception as e: | |
if not self.queues: | |
print('No Queue defined!!!') | |
logger.info(e) | |
_thread.interrupt_main() | |
@abstractmethod | |
def process_task(self, body) -> None: | |
raise NotImplementedError | |
def callback(self, body, message) -> None: | |
self.process_task(body) | |
message.ack() | |
def runner(self, in_thread: bool = False) -> None: | |
""" | |
Worker runner, pass in_thread=True to run in a thread and false to run in main thread | |
""" | |
if in_thread: | |
self.start() | |
else: | |
self.run() | |
class RandomTaskWorker(Worker): | |
""" | |
Example Worker | |
""" | |
def process_task(self, body): | |
print(10 * "*", self.name, '-----', body, 10 * "*") | |
sleep(1) | |
if __name__ == '__main__': | |
# One consumer consuming different queues | |
with Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost') as conn: | |
task = RandomTaskWorker(conn) | |
task.queues = [QueueBinding(queue='Task-1-Queue', exchange='Task-1', routing_key='T1'), | |
QueueBinding(queue='Task-2-Queue', exchange='Task-2', routing_key='T2')] | |
task.runner(in_thread=True) | |
# Separate consumers for each queue | |
with Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost') as conn: | |
task_1_worker = RandomTaskWorker(conn) | |
task_1_worker.queues = QueueBinding(queue='Task-1-Queue', exchange='Task-1', routing_key='T1') | |
task_1_worker.runner(in_thread=True) | |
with Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost') as conn: | |
task_2_worker = RandomTaskWorker(conn) | |
task_2_worker.queues = QueueBinding(queue='Task-2-Queue', exchange='Task-2', routing_key='T2') | |
task_2_worker.runner(in_thread=True) | |
# Main Thread | |
while True: | |
print(10 * "*", threading.current_thread().name, '-----', 'Hey, I am alive', 10 * "*") | |
sleep(5) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
from urllib3 import Retry | |
from kombu import Connection, Exchange | |
from kombu.pools import producers | |
def send_as_task(connection, exchange, routing_key, data): | |
with producers[connection].acquire(block=True) as producer: | |
producer.publish(data, | |
serializer='json', | |
exchange=exchange, | |
declare=[exchange], | |
routing_key=routing_key, | |
Retry=True) | |
if __name__ == '__main__': | |
connection = Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost') | |
while True: | |
exchange = Exchange('Task-1', type='topic') | |
send_as_task(connection, exchange=exchange, routing_key='T1', data="{me: too}") | |
print('T-1 Published...') | |
sleep(2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
from kombu import Connection, Exchange | |
from kombu.pools import producers | |
def send_as_task(connection, exchange, routing_key, data): | |
with producers[connection].acquire(block=True) as producer: | |
producer.publish(data, | |
serializer='json', | |
exchange=exchange, | |
declare=[exchange], | |
routing_key=routing_key) | |
if __name__ == '__main__': | |
connection = Connection('amqp://rabbit-user:rabbit-password@localhost:5672/rabbit-vhost') | |
while True: | |
exchange = Exchange('Task-2', type='topic') | |
send_as_task(connection, exchange=exchange, routing_key='T2', data="{me: three}") | |
print('T-2 Published...') | |
sleep(3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment