All consumers consuming over 1K queues, growing from 1 to 1K messages.
import pika
import threading
import sys
import time
import os
import atexit
from functools import partial
from datetime import datetime
# global variable to save all consumer instances
consumers = []
def print_deltas():
""" Caled before exit the program, print all elapsed times """
for consumer in consumers:
print consumer.delta
atexit.register(print_deltas)
class Consumer(threading.Thread):
def __init__(self, *args, **kwargs):
self._deltas = []
self._rx = 0
self._messages = kwargs["messages"]
self._thread_number = kwargs["thread_number"]
kwargs.pop("thread_number")
kwargs.pop("messages")
threading.Thread.__init__(self, *args, **kwargs)
@property
def delta(self):
return min(self._deltas)
def callback(self, channel, method, properties, message):
ts = float(message)
self._deltas.append(time.time() - ts) # how long was take get this message
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self._rx +=1
if self._rx == self._messages:
# cancel de loop and leave the thread
self.channel.stop_consuming()
def run(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True)
self.channel.basic_consume(self.callback, queue="test.{}".format(self._thread_number))
self.channel.start_consuming()
try:
messages = int(sys.argv[1])
except IndexError:
messages = 1
# up all threads
for thread in range(1, 1001):
consumer = Consumer(messages=messages, thread_number=thread, name="thread-{}".format(thread))
consumer.start()
consumers.append(consumer)
# wait until all threads consume one messges and quit by it self. Then the print_elapsed function
# will display the elapsed times
(1, 10, 100, 1K) messages
Amount of time nedeed to consume all messages from all queues
3.417 5.722 28.310 4m25.676
Messages per second consumed
292 1747 3532 3773
import pika
import sys
import time
from functools import partial
class Consumer(object):
def __init__(self, ioloop, ident, messages, end_consumers):
self._ioloop = ioloop
self._end_consumers = end_consumers
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._ident = ident
self._messages = messages
self._rx = 0
self.delta = -1 # enough odd
self._connection = self.connect()
def connect(self):
return pika.SelectConnection(pika.ConnectionParameters(host='localhost', socket_timeout=1000),
self.on_connection_open,
custom_ioloop=self._ioloop,
stop_ioloop_on_close=False)
def close_connection(self):
self._connection.close()
def on_connection_closed(self, connection, reply_code, reply_text):
self._channel = None
self._connection.ioloop.stop()
def on_connection_open(self, unused_connection):
self._connection.add_on_close_callback(self.on_connection_closed)
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_closed(self, channel, reply_code, reply_text):
self._connection.close()
def on_channel_open(self, channel):
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
self._channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True)
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
self._consumer_tag = self._channel.basic_consume(self.on_message,
"test.{}".format(self._ident))
def on_consumer_cancelled(self, method_frame):
if self._channel:
self._channel.close()
def on_message(self, unused_channel, basic_deliver, properties, message):
ts = float(message)
self.delta = (time.time()) - ts # how long was take
self._channel.basic_ack(basic_deliver.delivery_tag)
self._rx +=1
if self._rx == self._messages:
self._end_consumers[self._ident - 1] = True
if all(self._end_consumers):
self._ioloop.stop()
def on_cancelok(self, unused_frame):
self._channel.close()
def stop_consuming(self):
if self._channel:
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
try:
messages = int(sys.argv[1])
except IndexError:
messages = 1
consumers = []
end_consumers = [False] * 1000
ioloop = pika.adapters.select_connection.IOLoop()
# up all consumers
for ident in range(1, 1001):
consumer = Consumer(ioloop, ident, messages, end_consumers)
consumers.append(consumer)
ioloop.start()
for consumer in consumers:
print consumer.delta
1K queue, (1, 10, 100, 1K) messages
Amount of time nedeed to consume all messages from all queues
3.672 4.556 14.546 4m25.676 1m58.967
Messages per second consumed
272 2194 6874 8478 ~
It breaks the theory that says that in environs with short latency threaded architectures have better response time, still written in Python, and are more effective than the asynchronous ones.