Skip to content

Instantly share code, notes, and snippets.

@Sovetnikov
Created June 27, 2019 13:20
Show Gist options
  • Save Sovetnikov/1633e0326a3603be10f4b7ed9f35f628 to your computer and use it in GitHub Desktop.
Save Sovetnikov/1633e0326a3603be10f4b7ed9f35f628 to your computer and use it in GitHub Desktop.
Dramatiq eager broker for unittest
from dramatiq import Worker
from dramatiq.brokers.stub import StubBroker
from dramatiq.worker import _WorkerThread, _ConsumerThread
class EagerBroker(StubBroker):
def __init__(self, *args, **kwargs):
super(EagerBroker, self).__init__(*args, **kwargs)
self.worker = Worker(self, worker_timeout=100)
self.worker_thread = _WorkerThread(
broker=self.worker.broker,
consumers=self.worker.consumers,
work_queue=self.worker.work_queue,
worker_timeout=self.worker.worker_timeout
)
def enqueue(self, message, *, delay=None):
# Эмулируем по полной
# Ставим сообщение в очередь
message = super().enqueue(message, delay=delay)
for queue_name in self.get_declared_queues():
# Перебираем все очереди и получаем консьюмеры
consumer = self.consume(
queue_name=queue_name,
prefetch=False,
timeout=self.worker.worker_timeout,
)
if not queue_name in self.worker.consumers:
# Если в воркере не прописан консьюмер то создаём его, с "потоком"
consumer_thread = _ConsumerThread(
broker=self,
queue_name=queue_name,
prefetch=False,
work_queue=self.worker.work_queue,
worker_timeout=self.worker.worker_timeout,
)
consumer_thread.consumer = consumer
self.worker.consumers[queue_name] = consumer_thread
for message in consumer:
# Обрабатываем каждое сообщение через work_queue
if message is not None:
actor = self.get_actor(message.actor_name)
self.worker.work_queue.put((actor.priority, message))
self.worker_thread.process_message(message)
else:
# Сообщения в очереди кончились
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment