Skip to content

Instantly share code, notes, and snippets.

@lukebakken
Forked from sjlongland/test.sh
Created February 14, 2018 01:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukebakken/ab49531e504dfd8f4c133fcb9994a7d7 to your computer and use it in GitHub Desktop.
Save lukebakken/ab49531e504dfd8f4c133fcb9994a7d7 to your computer and use it in GitHub Desktop.
pika test case: channel close bug
#!/bin/sh -ex
: ${PYTHON2:=$( which python2 )}
: ${PYTHON3:=$( which python3 )}
pass=""
fail=""
for python in ${PYTHON2} ${PYTHON3}; do
for pika in 0.9.14 0.10.0 0.11.2; do
ver=$( ${python} --version 2>&1 | cut -f 2 -d' ' )
virtenv=${PWD}/py${ver}-pika${pika}
${python} -m virtualenv ${virtenv}
${virtenv}/bin/pip install pika==${pika} tornado
if ${virtenv}/bin/python testpika.py > test-py${ver}-pika${pika}.log 2>&1 ; then
pass="${pass} py${ver}-pika${pika}"
else
fail="${fail} py${ver}-pika${pika}"
cat test-py${ver}-pika${pika}.log
fi
done
done
set +x
echo "PASS: ${pass}"
echo "FAIL: ${fail}"
if [ -n "${fail}" ]; then
exit 1
fi
import logging
import functools
import sys
from sys import stdout
from sys import version as PYTHON_VERSION
from tornado.ioloop import IOLoop
from pika.adapters import TornadoConnection
from pika.connection import URLParameters
from pika import __version__ as PIKA_VERSION
class PikaTestCase(object):
"""
Our test case for Pika handling channel close events.
"""
AMQP_URI = 'amqp://guest:guest@localhost:5672/%2f'
def __init__(self, num_channels, io_loop=None):
if io_loop is None:
io_loop = IOLoop.current()
self._num_channels = num_channels + 1 # Increment by 1 since we're starting at index 1
self._io_loop = io_loop
self._connection = None
self._channels = {}
self._all_channels_created = False
self._test_done = False
self._declared = set()
self._declaring = set()
self._ch_cycles = {}
self._watchdog = None
io_loop.add_callback(self._connect)
def _force_shutdown():
logging.error('Test timed out')
self._set_result('Test timed out')
self._io_loop.stop()
self._io_loop.add_timeout(
self._io_loop.time() + 30, _force_shutdown)
self.result = None
def _set_result(self, result):
if self.result is None:
logging.info('Storing result: %s', result)
self.result = result
def _connect(self):
try:
logging.info('Opening connection to broker')
self._connection = TornadoConnection(
parameters=URLParameters(self.AMQP_URI),
on_open_callback=self._on_conn_open,
on_open_error_callback=self._on_conn_open_err,
on_close_callback=self._on_conn_close,
custom_ioloop=self._io_loop)
except:
logging.exception('Failed to connect')
self._set_result('Failed to connect')
self._io_loop.stop()
def _on_conn_open_err(self, *args, **kwargs):
logging.error('Failed to open connection; handler called with '\
'args=%s, kwargs=%s', args, kwargs)
self._io_loop.stop()
def _on_conn_close(self, connection, res_code, res_reason):
logging.info('Connection closed; code %s reason %s',
res_code, res_reason)
if res_code not in (0, 200):
self._set_result('CLOSED: %s %s' % (res_code, res_reason))
self._io_loop.stop()
def _on_conn_open(self, *args, **kwargs):
logging.info('Connection opened; handler called with '\
'args=%s, kwargs=%s', args, kwargs)
# Now, declare some channels
for ch_idx in range(1, self._num_channels):
self._ch_cycles[ch_idx] = 3
cb = functools.partial(self._on_chan_open, ch_idx)
self._connection.channel(on_open_callback=cb)
def _on_chan_open(self, ch_idx, channel):
logging.info('Channel %s opened (_all_channels_created=%s)', ch_idx, self._all_channels_created)
if self._test_done:
logging.info('Closing newly opened channel')
channel.close()
return
self._channels[ch_idx] = channel
cb = functools.partial(self._on_chan_close, ch_idx)
channel.add_on_close_callback(cb)
if self._all_channels_created:
self._io_loop.add_callback(self._run_test)
else:
self._io_loop.add_callback(self._check_channels)
def _check_channels(self):
# Do we have all channels yet?
if set(self._channels.keys()) == set(range(1, self._num_channels)):
logging.info('All channels created')
self._all_channels_created = True
self._io_loop.add_callback(self._run_test)
else:
self._all_channels_created = False
def _watchdog_timeout(self):
logging.error('Watchdog timeout!')
self._set_result('WATCHDOG TIMEOUT')
self._io_loop.add_callback(self._stop_test)
def _run_test(self):
# Try passively declaring exchanges.
# One will exist; two will not. Let's assume the use case for this is
# that a daemon running elsewhere creates the other two exchanges, and
# that daemon is down (maybe booting up, maybe crashed). We need *our*
# daemon to remain connected and keep retrying until it succeeds.
if self._test_done:
logging.info('Run cancelled due to test shutdown')
return
logging.info('Running test')
# Reset watchdog
if self._watchdog is not None:
self._io_loop.remove_timeout(self._watchdog)
wd_time = self._io_loop.time() + 10
self._watchdog = self._io_loop.add_timeout(wd_time, self._watchdog_timeout)
for ch_idx, channel in self._channels.items():
if ch_idx == 1:
ex_name = 'amq.fanout'
else:
ex_name = 'ex_%s' % ch_idx
if (ch_idx not in self._declared) and (ch_idx not in self._declaring):
logging.info("Passively declaring exchange '%s' in channel '%s'", ex_name, ch_idx)
self._declaring.add(ch_idx)
# This should succeed for idx 0,
# fail for others
cb = callback=functools.partial(self._on_exchange_declared, ch_idx, ex_name)
channel.exchange_declare(callback=cb, exchange=ex_name, exchange_type='fanout', passive=True)
def _on_exchange_declared(self, ch_idx, ex_name, *args, **kwargs):
logging.info('Channel %s declared exchange %s; handler called with '\
'args=%s, kwargs=%s', ch_idx, ex_name, args, kwargs)
self._declaring.discard(ch_idx)
self._declared.add(ch_idx)
def _on_chan_close(self, ch_idx, channel, reply_code, reply_text):
if channel.is_open:
logging.error('Channel %s should be closed, but is open?!?', ch_idx)
channel.close()
return
logging.info('Channel %s closed with reply code %s, '\
'reason %s, cycles %s',
ch_idx, reply_code, reply_text, self._ch_cycles.get(ch_idx))
self._all_channels_created = False
self._declaring.discard(ch_idx)
self._declared.discard(ch_idx)
self._channels.pop(ch_idx)
ch_num = channel.channel_number
if self._test_done:
# Are all channels closed?
if len(self._channels) == 0:
self._io_loop.add_callback(self._channels_closed)
else:
key_str = ' '.join(str(k) for k in self._channels.keys())
logging.info('Waiting for other channels to close: %s', key_str)
elif self._ch_cycles.get(ch_idx, 0) > 0:
# Try to re-open the channel, re-cycling the channel number
def _reopen():
logging.info('Re-opening channel %s (AMQP ch %s)', ch_idx, ch_num)
self._ch_cycles[ch_idx] -= 1
cb = on_open_callback=functools.partial(self._on_chan_open, ch_idx)
self._connection.channel(channel_number=ch_num, on_open_callback=cb)
self._io_loop.add_callback(_reopen)
else:
# Exhausted attempts for one channel, stopping test
if self._test_done:
logging.info('Channel %s (AMQP ch %s) exhausted attempts, test already stopped', ch_idx, ch_num)
else:
logging.info('Channel %s (AMQP ch %s) exhausted attempts, STOPPING TEST', ch_idx, ch_num)
self._io_loop.add_callback(self._stop_test)
def _stop_test(self):
if self._test_done:
logging.info('Test already stopped, not closing channels')
return
else:
logging.info('Stopping test and closing channels')
self._test_done = True
def _force_disconnect():
logging.error('force disconnect: channel clean-up timed out')
self._set_result('force disconnect: channel clean-up timed out')
self._io_loop.add_callback(self._channels_closed)
t = self._io_loop.time() + 10
self._io_loop.add_timeout(t, _force_disconnect)
for ch_idx, channel in self._channels.items():
ch_num = channel.channel_number
if channel.is_open:
logging.info('Closing channel %s (AMQP ch %s)', ch_idx, ch_num)
channel.close(reply_code=ch_idx)
else:
logging.warning('Channel %s (AMQP ch %s) is no longer open, not closing', ch_idx, ch_num)
def _channels_closed(self):
logging.info('Disconnecting')
self._connection.close()
if __name__ == '__main__':
num_channels = int(sys.argv[1])
logging.basicConfig(level=logging.DEBUG, stream=stdout,
format='%(asctime)s %(levelname)10s '\
'%(name)16s: %(message)s')
logging.info('Starting up.\nPython: %s\nPika: %s', PYTHON_VERSION, PIKA_VERSION)
try:
testcase = PikaTestCase(num_channels)
IOLoop.current().start()
assert testcase.result is None, 'Failed with result: %s' % testcase.result
except:
logging.exception('Test aborted')
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment