Skip to content

Instantly share code, notes, and snippets.

@nazavode
Last active March 15, 2016 14:51
Show Gist options
  • Save nazavode/97926f916798729c8e4e to your computer and use it in GitHub Desktop.
Save nazavode/97926f916798729c8e4e to your computer and use it in GitHub Desktop.
Multiple queue bindings for Kombu virtual transports.
# -*- coding: utf-8 -*-
import pytest
import timeit
# from duga import *
@pytest.fixture
def domain():
pass
def task_sum(*args):
return sum(*args)
class timed:
def __init__(self, title=None, echo=True):
self.title = title
self.echo = echo
def __enter__(self):
if self.echo:
print("### {}".format(self.title))
self.start = timeit.default_timer()
return self
def __exit__(self, *args):
self.end = timeit.default_timer()
self.elapsed = self.end - self.start
if self.echo:
print("### elapsed: {}".format(self.elapsed))
# -*- coding: utf-8 -*-
import pika
from fixtures import timed
AUTO_DELETE = True
DURABLE = False
NUM_OBSERVERS = 1
NUM_TASKS = 100000
def mock_task(task_name, exchange, channel):
channel.basic_publish(exchange=exchange, routing_key='{}.schedule'.format(task_name), body='{} scheduled'.format(task_name))
channel.basic_publish(exchange=exchange, routing_key='{}.start'.format(task_name), body='{} started'.format(task_name))
channel.basic_publish(exchange=exchange, routing_key='{}.running'.format(task_name), body='{} running'.format(task_name))
channel.basic_publish(exchange=exchange, routing_key='{}.done'.format(task_name), body='{} ended'.format(task_name))
def test_basic():
connection = pika.BlockingConnection()
channel = connection.channel()
channel.exchange_declare('ex.status', 'topic', durable=DURABLE, auto_delete=AUTO_DELETE)
channel.queue_declare('observer01', auto_delete=AUTO_DELETE, exclusive=True)
channel.queue_declare('logger', auto_delete=AUTO_DELETE, exclusive=True)
tasks = [
'task@' + str(i) for i in range(NUM_TASKS)
]
with timed('TASK CREATION'):
for task_id in tasks:
# Bind observer queue to task exchange:
channel.queue_bind(queue='observer01', exchange='ex.status', routing_key='{}.*'.format(task_id))
channel.queue_bind(queue='logger', exchange='ex.status', routing_key='#')
with timed('SENDING MESSAGES'):
for task_id in tasks:
mock_task(task_id, 'ex.status', channel)
with timed('RECEIVING MESSAGES #1'):
received = 0
for method_frame, properties, body in channel.consume('observer01', no_ack=True):
# print("MSG: {}, {}, {}".format(method_frame, properties, body))
received += 1
if received / 4 >= len(tasks):
break
with timed('CANCELLING CHANNEL #1'):
requeued_messages = channel.cancel()
print("Requeued messages: {}".format(requeued_messages))
with timed('RECEIVING MESSAGES #2'):
received = 0
for method_frame, properties, body in channel.consume('logger', no_ack=True):
# print("MSG: {}, {}, {}".format(method_frame, properties, body))
received += 1
if received / 4 >= len(tasks):
break
with timed('CANCELLING CHANNEL #2'):
requeued_messages = channel.cancel()
print("Requeued messages: {}".format(requeued_messages))
# with timed('DELETING QUEUES'):
# channel.queue_delete('observer01')
#
# with timed('DELETING EXCHANGES'):
# channel.exchange_delete('ex.status')
with timed('CLOSING CHANNEL'):
channel.close()
with timed('CLOSING CONNECTION'):
connection.close()
if __name__ == '__main__':
test_basic()
# -*- coding: utf-8 -*-
import pika
from fixtures import timed
AUTO_DELETE = True
DURABLE = False
def mock_task(task_name, exchange, channel):
channel.basic_publish(exchange=exchange, routing_key='{}.schedule'.format(task_name), body='{} scheduled'.format(task_name))
channel.basic_publish(exchange=exchange, routing_key='{}.start'.format(task_name), body='{} started'.format(task_name))
channel.basic_publish(exchange=exchange, routing_key='{}.running'.format(task_name), body='{} running'.format(task_name))
channel.basic_publish(exchange=exchange, routing_key='{}.done'.format(task_name), body='{} ended'.format(task_name))
def test_basic():
connection = pika.BlockingConnection()
channel = connection.channel()
channel.exchange_declare('ex.status', 'topic', durable=DURABLE, auto_delete=AUTO_DELETE)
channel.queue_declare('observer01', auto_delete=AUTO_DELETE, exclusive=True)
tasks = [
'task@' + str(i) for i in range(10)
]
task_exchanges = [
'ex.{}'.format(task_id) for task_id in tasks
]
with timed('TASK CREATION'):
for task_id, task_exchange in zip(tasks, task_exchanges):
# Create exchange:
channel.exchange_declare(task_exchange, 'fanout', durable=DURABLE, auto_delete=AUTO_DELETE)
# Bind task exchange to main exchange:
channel.exchange_bind(task_exchange, 'ex.status', '{}.*'.format(task_id))
# Bind observer queue to task exchange:
channel.queue_bind(queue='observer01', exchange=task_exchange)
with timed('SENDING MESSAGES'):
for task_id in tasks:
mock_task(task_id, 'ex.status', channel)
with timed('RECEIVING MESSAGES'):
for method_frame, properties, body in channel.consume('observer01', no_ack=True):
# print("MSG: {}, {}, {}".format(method_frame, properties, body))
if method_frame.delivery_tag / 4 >= len(tasks):
break
with timed('CANCELLING CHANNEL'):
requeued_messages = channel.cancel()
print("Requeued messages: {}".format(requeued_messages))
with timed('DELETING QUEUES'):
channel.queue_delete('observer01')
with timed('DELETING EXCHANGES'):
for task_exchange in task_exchanges:
channel.exchange_delete(task_exchange)
channel.exchange_delete('ex.status')
with timed('CLOSING CHANNEL'):
channel.close()
with timed('CLOSING CONNECTION'):
connection.close()
if __name__ == '__main__':
test_basic()
# -*- coding: utf-8 -*-
import kombu
# TRANSPORT = 'amqp://localhost'
TRANSPORT = 'sqla+sqlite:///KOMBU.sqlite'
# TRANSPORT = 'memory:///'
NUM_BINDINGS = 10
NUM_MESSAGES = 10
NUM_QUEUES = 3
EXPECTED_MESSAGES_COUNT = NUM_MESSAGES * NUM_BINDINGS
def test_multibind():
#
# Definitions
routing_keys = [
'route_{}'.format(i) for i in range(NUM_BINDINGS)
]
queues = [
kombu.Queue('queue_{}'.format(i)) for i in range(NUM_QUEUES)
]
exchange_def = kombu.Exchange('myex', type='topic')
#
# Connection
conn = kombu.Connection(TRANSPORT)
exchange = exchange_def(conn)
exchange.declare()
queues[:] = [q(conn) for q in queues]
for queue in queues:
queue.declare()
#
# Bind
for queue in queues:
for key in routing_keys:
queue.bind_to(exchange, routing_key=key)
#
# Publish
for key in routing_keys:
for msg_id in range(NUM_MESSAGES):
exchange.publish(
exchange.Message('msg{}@{}'.format(msg_id, key)),
routing_key=key
)
# return
#
# Consume
for queue in queues:
received = 0
while True:
msg = queue.get()
if msg is None:
break
received += 1
msg.ack() # remove message from queue
# Check
print("RECEIVED = {} # EXPECTED = {}".format(received, EXPECTED_MESSAGES_COUNT))
assert received == EXPECTED_MESSAGES_COUNT
# return
# Unbind
for queue in queues:
for key in routing_keys:
queue.unbind_from(exchange, routing_key=key)
# Publish
for key in routing_keys:
for msg_id in range(NUM_MESSAGES):
exchange.publish(
exchange.Message('msg{}@{}'.format(msg_id, key)),
routing_key=key
)
# Consume
for queue in queues:
received = 0
while True:
msg = queue.get()
if msg is None:
break
received += 1
msg.ack() # remove message from queue
# Check
print("RECEIVED = {} # EXPECTED = {}".format(received, 0))
assert received == 0
# Cleanup
for queue in queues:
queue.delete()
exchange.delete()
conn.close()
if __name__ == '__main__':
test_multibind()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment