Skip to content

Instantly share code, notes, and snippets.

@gst
Last active August 29, 2015 14:25
Show Gist options
  • Save gst/ad5dff15e2fe37e9b3dc to your computer and use it in GitHub Desktop.
Save gst/ad5dff15e2fe37e9b3dc to your computer and use it in GitHub Desktop.
test case showing problem with channel.returned_messages
from queue import Empty
from threading import Thread, Lock
from time import sleep
import amqpy
from amqpy.message import Message
# the more you set: the more probably you'll get the bad results:
n_threads = 20
n_loop_per_thread = 100
use_channel_close_method = False # far, far, far more miss than with True
conn = amqpy.Connection()
channel = conn.channel()
exchange = 'my_test_exchange'
channel.exchange_declare(exchange, 'fanout', auto_delete=False)
# channel.basic_recover(requeue=True)
queue = 'my_test_queue'
channel.queue_declare(queue)
channel.queue_bind(queue, exchange)
multiple_returned = []
not_returned = []
def my_publish(key):
if use_channel_close_method:
ch = conn.channel()
ch.exchange_declare(exchange, 'fanout', auto_delete=False)
else:
ch = channel
msg = Message(key)
# EDITED: s/channel/ch/ :
ch.basic_publish(msg, exchange=exchange, mandatory=True)
if use_channel_close_method:
ch.close()
else:
conn.loop(0.0001)
n_returned = 0
while True:
try:
ret = channel.returned_messages.get_nowait()
if n_returned:
# bad: message was returned multiple times !
multiple_returned.append(key)
else:
pass # good: message was returned as expected
n_returned += 1
except Empty:
if not n_returned:
# bad: message was not returned
not_returned.append(key)
break
lock = Lock()
def producer(key):
for i in range(n_loop_per_thread):
with lock:
my_publish("%s:%s" % (key, i))
threads = [Thread(target=producer, args=(i,)) for i in range(n_threads)]
channel.queue_delete(queue)
print("queue has been deleted, exchange should be bind to no queue by now .."
"You can check that if you will..")
sleep(1)
for t in threads:
t.start()
for t in threads:
t.join()
if multiple_returned != []:
print("bad multiple returned:", multiple_returned)
if not_returned != []:
print("bad some not_returned:", not_returned)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment