Last active
August 29, 2015 14:25
-
-
Save gst/ad5dff15e2fe37e9b3dc to your computer and use it in GitHub Desktop.
test case showing problem with channel.returned_messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from queue import Empty | |
from threading import Thread, Lock | |
from time import sleep | |
import amqpy | |
from amqpy.message import Message | |
# the more you set: the more probably you'll get the bad results: | |
n_threads = 20 | |
n_loop_per_thread = 100 | |
use_channel_close_method = False # far, far, far more miss than with True | |
conn = amqpy.Connection() | |
channel = conn.channel() | |
exchange = 'my_test_exchange' | |
channel.exchange_declare(exchange, 'fanout', auto_delete=False) | |
# channel.basic_recover(requeue=True) | |
queue = 'my_test_queue' | |
channel.queue_declare(queue) | |
channel.queue_bind(queue, exchange) | |
multiple_returned = [] | |
not_returned = [] | |
def my_publish(key): | |
if use_channel_close_method: | |
ch = conn.channel() | |
ch.exchange_declare(exchange, 'fanout', auto_delete=False) | |
else: | |
ch = channel | |
msg = Message(key) | |
# EDITED: s/channel/ch/ : | |
ch.basic_publish(msg, exchange=exchange, mandatory=True) | |
if use_channel_close_method: | |
ch.close() | |
else: | |
conn.loop(0.0001) | |
n_returned = 0 | |
while True: | |
try: | |
ret = channel.returned_messages.get_nowait() | |
if n_returned: | |
# bad: message was returned multiple times ! | |
multiple_returned.append(key) | |
else: | |
pass # good: message was returned as expected | |
n_returned += 1 | |
except Empty: | |
if not n_returned: | |
# bad: message was not returned | |
not_returned.append(key) | |
break | |
lock = Lock() | |
def producer(key): | |
for i in range(n_loop_per_thread): | |
with lock: | |
my_publish("%s:%s" % (key, i)) | |
threads = [Thread(target=producer, args=(i,)) for i in range(n_threads)] | |
channel.queue_delete(queue) | |
print("queue has been deleted, exchange should be bind to no queue by now .." | |
"You can check that if you will..") | |
sleep(1) | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
if multiple_returned != []: | |
print("bad multiple returned:", multiple_returned) | |
if not_returned != []: | |
print("bad some not_returned:", not_returned) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment