Created
June 27, 2019 13:20
-
-
Save Sovetnikov/1633e0326a3603be10f4b7ed9f35f628 to your computer and use it in GitHub Desktop.
Dramatiq eager broker for unittest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dramatiq import Worker | |
from dramatiq.brokers.stub import StubBroker | |
from dramatiq.worker import _WorkerThread, _ConsumerThread | |
class EagerBroker(StubBroker): | |
def __init__(self, *args, **kwargs): | |
super(EagerBroker, self).__init__(*args, **kwargs) | |
self.worker = Worker(self, worker_timeout=100) | |
self.worker_thread = _WorkerThread( | |
broker=self.worker.broker, | |
consumers=self.worker.consumers, | |
work_queue=self.worker.work_queue, | |
worker_timeout=self.worker.worker_timeout | |
) | |
def enqueue(self, message, *, delay=None): | |
# Эмулируем по полной | |
# Ставим сообщение в очередь | |
message = super().enqueue(message, delay=delay) | |
for queue_name in self.get_declared_queues(): | |
# Перебираем все очереди и получаем консьюмеры | |
consumer = self.consume( | |
queue_name=queue_name, | |
prefetch=False, | |
timeout=self.worker.worker_timeout, | |
) | |
if not queue_name in self.worker.consumers: | |
# Если в воркере не прописан консьюмер то создаём его, с "потоком" | |
consumer_thread = _ConsumerThread( | |
broker=self, | |
queue_name=queue_name, | |
prefetch=False, | |
work_queue=self.worker.work_queue, | |
worker_timeout=self.worker.worker_timeout, | |
) | |
consumer_thread.consumer = consumer | |
self.worker.consumers[queue_name] = consumer_thread | |
for message in consumer: | |
# Обрабатываем каждое сообщение через work_queue | |
if message is not None: | |
actor = self.get_actor(message.actor_name) | |
self.worker.work_queue.put((actor.priority, message)) | |
self.worker_thread.process_message(message) | |
else: | |
# Сообщения в очереди кончились | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment