Skip to content

Instantly share code, notes, and snippets.

@pfreixes
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pfreixes/05e195825c7d699b4003 to your computer and use it in GitHub Desktop.
Save pfreixes/05e195825c7d699b4003 to your computer and use it in GitHub Desktop.
1K pika consumers, asyncronous vs thread

All consumers consuming over 1K queues, growing from 1 to 1K messages.

consumer_threaded_pika.py

import pika
import threading
import sys
import time
import os
import atexit

from functools import partial
from datetime import datetime

# global variable to save all consumer instances
consumers = []

def print_deltas():
    """ Caled before exit the program, print all elapsed times """
    for consumer in consumers:
        print consumer.delta

atexit.register(print_deltas)

class Consumer(threading.Thread):
    def __init__(self, *args, **kwargs):
        self._deltas = []
        self._rx = 0
        self._messages = kwargs["messages"]
        self._thread_number = kwargs["thread_number"]
        kwargs.pop("thread_number")
        kwargs.pop("messages")
        threading.Thread.__init__(self, *args, **kwargs)

    @property
    def delta(self):
        return min(self._deltas)

    def callback(self, channel, method, properties, message):
        ts = float(message)
        self._deltas.append(time.time() - ts) # how long was take get this message
        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        self._rx +=1
        if self._rx == self._messages:
           # cancel de loop and leave the thread
           self.channel.stop_consuming()

    def run(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
        self.channel = self.connection.channel()
        self.channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True)
        self.channel.basic_consume(self.callback, queue="test.{}".format(self._thread_number))
        self.channel.start_consuming()

try:
    messages = int(sys.argv[1])
except IndexError:
    messages = 1
    
# up all threads
for thread in range(1, 1001):
    consumer = Consumer(messages=messages, thread_number=thread, name="thread-{}".format(thread))
    consumer.start()
    consumers.append(consumer)

# wait until all threads consume one messges and quit by it self. Then the print_elapsed function
# will display the elapsed times

(1, 10, 100, 1K) messages

Amount of time nedeed to consume all messages from all queues

3.417 5.722 28.310 4m25.676

Messages per second consumed

292 1747 3532 3773

consumer_asyncronous_pika.py

import pika
import sys
import time

from functools import partial


class Consumer(object):

    def __init__(self, ioloop, ident, messages, end_consumers):
        self._ioloop = ioloop
        self._end_consumers = end_consumers
        self._connection = None
        self._channel = None
        self._closing = False
        self._consumer_tag = None
        self._ident = ident
        self._messages = messages
        self._rx = 0
        self.delta = -1   # enough odd
        self._connection = self.connect()

    def connect(self):
        return pika.SelectConnection(pika.ConnectionParameters(host='localhost', socket_timeout=1000),
                                     self.on_connection_open,
                                     custom_ioloop=self._ioloop,
                                     stop_ioloop_on_close=False)

    def close_connection(self):
        self._connection.close()

    def on_connection_closed(self, connection, reply_code, reply_text):
        self._channel = None
        self._connection.ioloop.stop()

    def on_connection_open(self, unused_connection):
        self._connection.add_on_close_callback(self.on_connection_closed)
        self._connection.channel(on_open_callback=self.on_channel_open)


    def on_channel_closed(self, channel, reply_code, reply_text):
        self._connection.close()

    def on_channel_open(self, channel):
        self._channel = channel
        self._channel.add_on_close_callback(self.on_channel_closed)
        self._channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True)
        self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
        self._consumer_tag = self._channel.basic_consume(self.on_message,
                                                         "test.{}".format(self._ident))

    def on_consumer_cancelled(self, method_frame):
        if self._channel:
            self._channel.close()

    def on_message(self, unused_channel, basic_deliver, properties, message):
        ts = float(message)
        self.delta = (time.time()) - ts  # how long was take
        self._channel.basic_ack(basic_deliver.delivery_tag)
        self._rx +=1
        if self._rx == self._messages:
           self._end_consumers[self._ident - 1] = True
           if all(self._end_consumers):
               self._ioloop.stop()


    def on_cancelok(self, unused_frame):
        self._channel.close()

        
    def stop_consuming(self):
        if self._channel:
            self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)


try:
    messages = int(sys.argv[1])
except IndexError:
    messages = 1

consumers = []
end_consumers = [False] * 1000
ioloop = pika.adapters.select_connection.IOLoop()

# up all consumers
for ident in range(1, 1001):
    consumer = Consumer(ioloop, ident, messages, end_consumers)
    consumers.append(consumer)

ioloop.start()

for consumer in consumers:
    print consumer.delta

1K queue, (1, 10, 100, 1K) messages

Amount of time nedeed to consume all messages from all queues

3.672 4.556 14.546 4m25.676 1m58.967

Messages per second consumed

272 2194 6874 8478 ~

@pfreixes
Copy link
Author

It breaks the theory that says that in environs with short latency threaded architectures have better response time, still written in Python, and are more effective than the asynchronous ones.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment