Last active
March 15, 2016 14:51
-
-
Save nazavode/97926f916798729c8e4e to your computer and use it in GitHub Desktop.
Multiple queue bindings for Kombu virtual transports.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import pytest | |
import timeit | |
# from duga import * | |
@pytest.fixture | |
def domain(): | |
pass | |
def task_sum(*args): | |
return sum(*args) | |
class timed: | |
def __init__(self, title=None, echo=True): | |
self.title = title | |
self.echo = echo | |
def __enter__(self): | |
if self.echo: | |
print("### {}".format(self.title)) | |
self.start = timeit.default_timer() | |
return self | |
def __exit__(self, *args): | |
self.end = timeit.default_timer() | |
self.elapsed = self.end - self.start | |
if self.echo: | |
print("### elapsed: {}".format(self.elapsed)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import pika | |
from fixtures import timed | |
AUTO_DELETE = True | |
DURABLE = False | |
NUM_OBSERVERS = 1 | |
NUM_TASKS = 100000 | |
def mock_task(task_name, exchange, channel): | |
channel.basic_publish(exchange=exchange, routing_key='{}.schedule'.format(task_name), body='{} scheduled'.format(task_name)) | |
channel.basic_publish(exchange=exchange, routing_key='{}.start'.format(task_name), body='{} started'.format(task_name)) | |
channel.basic_publish(exchange=exchange, routing_key='{}.running'.format(task_name), body='{} running'.format(task_name)) | |
channel.basic_publish(exchange=exchange, routing_key='{}.done'.format(task_name), body='{} ended'.format(task_name)) | |
def test_basic(): | |
connection = pika.BlockingConnection() | |
channel = connection.channel() | |
channel.exchange_declare('ex.status', 'topic', durable=DURABLE, auto_delete=AUTO_DELETE) | |
channel.queue_declare('observer01', auto_delete=AUTO_DELETE, exclusive=True) | |
channel.queue_declare('logger', auto_delete=AUTO_DELETE, exclusive=True) | |
tasks = [ | |
'task@' + str(i) for i in range(NUM_TASKS) | |
] | |
with timed('TASK CREATION'): | |
for task_id in tasks: | |
# Bind observer queue to task exchange: | |
channel.queue_bind(queue='observer01', exchange='ex.status', routing_key='{}.*'.format(task_id)) | |
channel.queue_bind(queue='logger', exchange='ex.status', routing_key='#') | |
with timed('SENDING MESSAGES'): | |
for task_id in tasks: | |
mock_task(task_id, 'ex.status', channel) | |
with timed('RECEIVING MESSAGES #1'): | |
received = 0 | |
for method_frame, properties, body in channel.consume('observer01', no_ack=True): | |
# print("MSG: {}, {}, {}".format(method_frame, properties, body)) | |
received += 1 | |
if received / 4 >= len(tasks): | |
break | |
with timed('CANCELLING CHANNEL #1'): | |
requeued_messages = channel.cancel() | |
print("Requeued messages: {}".format(requeued_messages)) | |
with timed('RECEIVING MESSAGES #2'): | |
received = 0 | |
for method_frame, properties, body in channel.consume('logger', no_ack=True): | |
# print("MSG: {}, {}, {}".format(method_frame, properties, body)) | |
received += 1 | |
if received / 4 >= len(tasks): | |
break | |
with timed('CANCELLING CHANNEL #2'): | |
requeued_messages = channel.cancel() | |
print("Requeued messages: {}".format(requeued_messages)) | |
# with timed('DELETING QUEUES'): | |
# channel.queue_delete('observer01') | |
# | |
# with timed('DELETING EXCHANGES'): | |
# channel.exchange_delete('ex.status') | |
with timed('CLOSING CHANNEL'): | |
channel.close() | |
with timed('CLOSING CONNECTION'): | |
connection.close() | |
if __name__ == '__main__': | |
test_basic() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import pika | |
from fixtures import timed | |
AUTO_DELETE = True | |
DURABLE = False | |
def mock_task(task_name, exchange, channel): | |
channel.basic_publish(exchange=exchange, routing_key='{}.schedule'.format(task_name), body='{} scheduled'.format(task_name)) | |
channel.basic_publish(exchange=exchange, routing_key='{}.start'.format(task_name), body='{} started'.format(task_name)) | |
channel.basic_publish(exchange=exchange, routing_key='{}.running'.format(task_name), body='{} running'.format(task_name)) | |
channel.basic_publish(exchange=exchange, routing_key='{}.done'.format(task_name), body='{} ended'.format(task_name)) | |
def test_basic(): | |
connection = pika.BlockingConnection() | |
channel = connection.channel() | |
channel.exchange_declare('ex.status', 'topic', durable=DURABLE, auto_delete=AUTO_DELETE) | |
channel.queue_declare('observer01', auto_delete=AUTO_DELETE, exclusive=True) | |
tasks = [ | |
'task@' + str(i) for i in range(10) | |
] | |
task_exchanges = [ | |
'ex.{}'.format(task_id) for task_id in tasks | |
] | |
with timed('TASK CREATION'): | |
for task_id, task_exchange in zip(tasks, task_exchanges): | |
# Create exchange: | |
channel.exchange_declare(task_exchange, 'fanout', durable=DURABLE, auto_delete=AUTO_DELETE) | |
# Bind task exchange to main exchange: | |
channel.exchange_bind(task_exchange, 'ex.status', '{}.*'.format(task_id)) | |
# Bind observer queue to task exchange: | |
channel.queue_bind(queue='observer01', exchange=task_exchange) | |
with timed('SENDING MESSAGES'): | |
for task_id in tasks: | |
mock_task(task_id, 'ex.status', channel) | |
with timed('RECEIVING MESSAGES'): | |
for method_frame, properties, body in channel.consume('observer01', no_ack=True): | |
# print("MSG: {}, {}, {}".format(method_frame, properties, body)) | |
if method_frame.delivery_tag / 4 >= len(tasks): | |
break | |
with timed('CANCELLING CHANNEL'): | |
requeued_messages = channel.cancel() | |
print("Requeued messages: {}".format(requeued_messages)) | |
with timed('DELETING QUEUES'): | |
channel.queue_delete('observer01') | |
with timed('DELETING EXCHANGES'): | |
for task_exchange in task_exchanges: | |
channel.exchange_delete(task_exchange) | |
channel.exchange_delete('ex.status') | |
with timed('CLOSING CHANNEL'): | |
channel.close() | |
with timed('CLOSING CONNECTION'): | |
connection.close() | |
if __name__ == '__main__': | |
test_basic() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import kombu | |
# TRANSPORT = 'amqp://localhost' | |
TRANSPORT = 'sqla+sqlite:///KOMBU.sqlite' | |
# TRANSPORT = 'memory:///' | |
NUM_BINDINGS = 10 | |
NUM_MESSAGES = 10 | |
NUM_QUEUES = 3 | |
EXPECTED_MESSAGES_COUNT = NUM_MESSAGES * NUM_BINDINGS | |
def test_multibind(): | |
# | |
# Definitions | |
routing_keys = [ | |
'route_{}'.format(i) for i in range(NUM_BINDINGS) | |
] | |
queues = [ | |
kombu.Queue('queue_{}'.format(i)) for i in range(NUM_QUEUES) | |
] | |
exchange_def = kombu.Exchange('myex', type='topic') | |
# | |
# Connection | |
conn = kombu.Connection(TRANSPORT) | |
exchange = exchange_def(conn) | |
exchange.declare() | |
queues[:] = [q(conn) for q in queues] | |
for queue in queues: | |
queue.declare() | |
# | |
# Bind | |
for queue in queues: | |
for key in routing_keys: | |
queue.bind_to(exchange, routing_key=key) | |
# | |
# Publish | |
for key in routing_keys: | |
for msg_id in range(NUM_MESSAGES): | |
exchange.publish( | |
exchange.Message('msg{}@{}'.format(msg_id, key)), | |
routing_key=key | |
) | |
# return | |
# | |
# Consume | |
for queue in queues: | |
received = 0 | |
while True: | |
msg = queue.get() | |
if msg is None: | |
break | |
received += 1 | |
msg.ack() # remove message from queue | |
# Check | |
print("RECEIVED = {} # EXPECTED = {}".format(received, EXPECTED_MESSAGES_COUNT)) | |
assert received == EXPECTED_MESSAGES_COUNT | |
# return | |
# Unbind | |
for queue in queues: | |
for key in routing_keys: | |
queue.unbind_from(exchange, routing_key=key) | |
# Publish | |
for key in routing_keys: | |
for msg_id in range(NUM_MESSAGES): | |
exchange.publish( | |
exchange.Message('msg{}@{}'.format(msg_id, key)), | |
routing_key=key | |
) | |
# Consume | |
for queue in queues: | |
received = 0 | |
while True: | |
msg = queue.get() | |
if msg is None: | |
break | |
received += 1 | |
msg.ack() # remove message from queue | |
# Check | |
print("RECEIVED = {} # EXPECTED = {}".format(received, 0)) | |
assert received == 0 | |
# Cleanup | |
for queue in queues: | |
queue.delete() | |
exchange.delete() | |
conn.close() | |
if __name__ == '__main__': | |
test_multibind() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment