Skip to content

Instantly share code, notes, and snippets.

@sjlongland
Last active November 14, 2018 14:13
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save sjlongland/3b89ce91d065851b292a to your computer and use it in GitHub Desktop.
Save sjlongland/3b89ce91d065851b292a to your computer and use it in GitHub Desktop.
Tornado Coroutine interface to AMQP (pika)
#!/usr/bin/python
import datetime
import logging
import threading
import weakref
import tornado
import tornado.gen
import pika
'''
Pika-Tornado: a Tornado coroutine-based abstraction layer for Pika.
'''
# (C) 2015 Stuart Longland <stuartl@longlandclan.yi.org>
# Released under the terms of the Mozilla Public License v2.0
class AMQPObject(object):
@classmethod
def _get_log(cls, *name):
return logging.getLogger('.'.join(
(cls.__module__, cls.__name__) + name))
class AMQPConnection(AMQPObject):
'''
Connection to AMQP, this object handles unexpected disconnections (and
re-connects accordingly) and wraps the channel opening process in a
coroutine-friendly manner.
Support for multiple IOLoops is provided and thread-safety is attempted.
Callbacks may be specified in a number of ways:
- as a reference to the function directly
- as a tuple:
- first element gives the IOLoop (or None for the client's IOLoop)
- second element gives the reference to the function
- third element if present gives the initial args
- forth element if present gives the initial kwargs
'''
CONNECTION_CLOSED = pika.connection.Connection.CONNECTION_CLOSED
CONNECTION_INIT = pika.connection.Connection.CONNECTION_INIT
CONNECTION_PROTOCOL = pika.connection.Connection.CONNECTION_PROTOCOL
CONNECTION_START = pika.connection.Connection.CONNECTION_START
CONNECTION_TUNE = pika.connection.Connection.CONNECTION_TUNE
CONNECTION_OPEN = pika.connection.Connection.CONNECTION_OPEN
CONNECTION_CLOSING = pika.connection.Connection.CONNECTION_CLOSING
def __init__(self, parameters, on_open_callback=None,
on_open_error_callback=None, on_close_callback=None,
on_giveup_callback=None, reconnect_delay=5.0, reconnect_max=-1,
io_loop=None, io_thread=None):
'''
Initialise a connection to AMQP.
:param: pika.connection.Parameters parameters:
Connection parameters, see Pika documentation.
:param method on_open_callback:
Called when the connection is opened.
:param method on_open_error_callback:
Called if the connection can't be opened.
:param method on_close_callback:
Called when the connection is closed.
:param method on_giveup_callback:
Called when the connection attempts have been exhausted
and this library "gives up". This might be used to log
an alarm and shut down the daemon.
:param float reconnect_delay:
The amount of time to wait before attempting to reconnect.
Set to 0 to disable reconnect.
:param int reconnect_max:
The maximum number of reconnection attempts. Set to 0 to
permit unlimited reconnections.
:param tornado.ioloop.IOLoop io_loop:
IOLoop instance to use for the AMQP client itself. By default,
the current IOLoop is used.
:param threading.Thread io_thread:
Thread used for AMQP communications. Since Pika itself is not
thread-safe, it is imperative that all AMQP operations take place
in the same thread as the AMQP IOLoop instance.
If this is different to the thread the constructor is running in
for whatever reason, the reference to that thread can be given
here.
'''
if io_loop is None:
io_loop = tornado.ioloop.IOLoop.current()
if io_thread is None:
io_thread = threading.current_thread()
self._io_loop = io_loop
self._io_thread = io_thread
self._reconnect_delay = datetime.timedelta(seconds=reconnect_delay)
self._reconnect_max = reconnect_max
self._reconnect_rem = reconnect_max
self._reconnect = reconnect_delay > 0
self._giveup = False
self._on_open_cb = []
self._on_open_error_cb = []
self._on_close_cb = []
self._on_giveup_cb = []
if on_open_callback is not None:
self._on_open_cb.append(
self._get_callback(on_open_callback))
if on_open_error_callback is not None:
self._on_open_error_cb.append(
self._get_callback(on_open_error_callback))
if on_close_callback is not None:
self._on_close_cb.append(
self._get_callback(on_close_callback))
if on_giveup_callback is not None:
self._on_giveup_cb.append(
self._get_callback(on_giveup_callback))
self._amqp_parameters = parameters
self._amqp_connection = None
self._channels = []
self._replace_connection()
# ------------------------------------------------------------------------
# Connection state
# ------------------------------------------------------------------------
connection_state = property(lambda s : \
s._amqp_connection.connection_state)
is_closed = property(lambda s : s._amqp_connection.is_closed)
is_closing = property(lambda s : s._amqp_connection.is_closing)
is_open = property(lambda s : s._amqp_connection.is_open)
# ------------------------------------------------------------------------
# Connection callbacks
# ------------------------------------------------------------------------
def add_on_open_callback(self, callback, callback_args=None,
callback_kwargs=None):
self._on_open_cb.append(
self._get_callback(callback, callback_args, callback_kwargs))
def add_on_open_error_callback(self, callback, callback_args=None,
callback_kwargs=None):
self._on_open_error_cb.append(
self._get_callback(callback, callback_args, callback_kwargs))
def add_on_close_callback(self, callback, callback_args=None,
callback_kwargs=None):
self._on_close_cb.append(
self._get_callback(callback, callback_args, callback_kwargs))
def add_on_giveup_callback(self, callback, callback_args=None,
callback_kwargs=None):
self._on_giveup.append(
self._get_callback(callback, callback_args, callback_kwargs))
# ------------------------------------------------------------------------
# Connection management
# ------------------------------------------------------------------------
def _replace_connection(self):
log = self._get_log('_replace_connection')
try:
log.debug('Gathering a list of existing channels')
# Grab a list of channels
channels = self._get_channels()
log.debug( 'We have %s channels to reopen. Opening '\
'connection to AMQP with parameters %s.',
len(channels), self._amqp_parameters)
# Replace the connection
self._amqp_connection = pika.adapters.TornadoConnection(
parameters = self._amqp_parameters,
on_open_callback = self._handle_on_open_callback,
on_open_error_callback = self._handle_on_open_error_callback,
on_close_callback = self._handle_on_close_callback,
custom_ioloop = self._io_loop)
log.debug('AMQP connection started')
# Mark these channels as stale
for ch in channels:
try:
log.debug('Re-opening %s', ch)
ch._mark_stale()
except:
pass
except:
log.exception('Failed to replace connection')
def connect(self, timeout=None):
'''
Connect to AMQP.
'''
return self._run_async(
self._amqp_connection.connect,
(),
{},
timeout,
None,
None)
def close(self, *args, **kwargs):
'''
Disconnect from AMQP. This disables reconnections.
See pika.BaseConnection.close for details on arguments.
'''
timeout = kwargs.pop('timeout',None)
self._giveup = False
self._reconnect = False
return self._run_async(
async_func = self._amqp_connection.close,
args = args,
kwargs = kwargs,
timeout = timeout,
cb_func = None,
future = None)
def set_backpressure_multiplier(self, *args, **kwargs):
return self._run_async(
async_func = self._amqp_connection.set_backpressure_multiplier,
args = args,
kwargs = kwargs,
timeout = timeout,
cb_func = None,
future = None)
# ------------------------------------------------------------------------
# Connection callbacks
# ------------------------------------------------------------------------
def add_backpressure_callback(self, callback):
return self._run_async(
self._amqp_connection.add_backpressure_callback,
args=(self._get_callback(callback),))
def add_on_close_callback(self, callback):
return self._run_async(
self._amqp_connection.add_on_close_callback,
args=(self._get_callback(callback),))
def add_on_open_callback(self, callback):
return self._run_async(
self._amqp_connection.add_on_open_callback,
args=(self._get_callback(callback),))
def add_on_open_error_callback(self, callback):
return self._run_async(
self._amqp_connection.add_on_open_error_callback,
args=(self._get_callback(callback),))
# ------------------------------------------------------------------------
# Channel management
# ------------------------------------------------------------------------
def _cleanup_channels(self):
self._channels = filter(lambda cr : cr() is not None, self._channels)
def _get_channels(self):
self._cleanup_channels()
return [ch() for ch in self._channels]
@tornado.gen.coroutine
def channel(self, channel_number=None, timeout=None):
'''
Open a new channel on the AMQP connection.
'''
ch = AMQPChannel(self, channel_number)
yield ch._init_channel(timeout)
self._cleanup_channels()
self._channels.append(weakref.ref(ch))
raise tornado.gen.Return(ch)
# ------------------------------------------------------------------------
# Connection handling callbacks
# ------------------------------------------------------------------------
def _handle_on_open_callback(self, *args, **kwargs):
'''
What to do when the connection is opened?
'''
log = self._get_log('_handle_on_open_callback')
try:
log.info('Connection open')
self._reconnect = self._reconnect_delay.total_seconds() > 0
self._reconnect_rem = self._reconnect_max
self._giveup = False
# Grab a list of channels
channels = self._get_channels()
# Re-open those channels
for ch in channels:
log.debug('Re-opening %s', ch)
yield ch._init_channel()
log.debug('Calling open callbacks')
for cb in self._on_open_cb:
self._io_loop.add_callback(cb, *args, **kwargs)
except:
log.exception('Failed to handle connection open')
def _handle_on_close_callback(self, *args, **kwargs):
'''
What to do when the connection is closed?
'''
log = self._get_log('_handle_on_close_callback')
try:
log.info('Connection closed')
for cb in self._on_close_cb:
try:
cb(*args, **kwargs)
except:
log.exception('Exception in callback %s(*%s, **%s)',
cb, args, kwargs)
except:
log.exception('Exception in close callback')
self._reconnect()
def _handle_on_open_error_callback(self, connection, error,
*args, **kwargs):
'''
What to do if we fail to open the connection?
'''
log = self._get_log('_handle_on_open_error_callback')
try:
log.error('Received %s whilst attempting connection', error)
# Pass it through to the callback handler we were given
if self._on_open_error_cb is not None:
for cb in self._on_open_error_cb:
try:
cb(connection, error, *args, **kwargs)
except:
log.exception('Error in callback %s(*%s, **%s)',
cb, args, kwargs)
except:
log.exception('Failed in open-error callback')
self._reconnect()
def _reconnect(self):
'''
Perform a re-connection, if enabled.
'''
log = self._get_log('_reconnect')
try:
# Sanity check, ensure we're running in the correct thread for this.
# add_timeout is *NOT* thread-safe.
if not self._is_own_thread:
# This should do the trick!
self._io_loop.add_callback(_reconnect)
return
# Reconnect?
if self._reconnect:
if self._reconnect_max > 0:
self._reconnect_rem -= 1
self._reconnect = self._reconnect_rem > 0
self._giveup = not self._reconnect
if self._giveup:
log.error('This is our last connection attempt!')
# Schedule a reconnect. We do this by completely replacing the
# AMQP connection object.
self._io_loop.add_timeout(
self._reconnect_delay, self._replace_connection)
log.error('Re-connecting in %s.',
self._reconnect_delay)
elif self._giveup:
log.fatal('Giving up!')
for cb in self._on_giveup_cb:
try:
cb()
except:
log.exception('Exception in callback %s', cb)
except:
log.exception('Failure in recnnection')
# ------------------------------------------------------------------------
# Callback and thread-safety utilities
# ------------------------------------------------------------------------
@property
def _is_own_thread(self):
'''
Return True if we're running in our own thread.
'''
return self._io_thread is threading.current_thread()
def _schedule_timeout(self, timeout, future, cb_func=None,
cb_args=None, cb_kwargs=None):
'''
Place a TimeoutException into the future if the future is not
completed in time.
'''
assert self._is_own_thread, 'This isn\'t my thread!'
if cb_args is None:
cb_args = ()
if cb_kwargs is None:
cb_kwargs = {}
def _timeout():
if not future.done():
future.set_exception(TimeoutError())
if cb_func is not None:
cb_func(*cb_args, **cb_kwargs)
return self._io_loop.add_timeout(timeout, _timeout)
@tornado.gen.coroutine
def _run_async(self, async_func, args=None, kwargs=None, timeout=None,
cb_func=None, callback_name='callback', future=None):
'''
Run a method in our own thread and return its result.
args and kwargs are passed to async_func, which will be run in the
AMQP IOLoop.
timeout specifies a time limit on the operation, and will result in a
TimeoutError if the operation does not complete.
callback, if given, causes a callback keyword argument to be passed to
async_func which it should run when the result comes back.
return_in_future specifies
'''
log = self._get_log('_run_async')
try:
#log.debug('Run %s\n\twith args %s\n\tkwargs %s'\
# '\n\ttimeout %s\n\tstore result in future %s'\
# '\n\tand using callback %s (named %s)',
# async_func, args, kwargs, timeout, future,
# cb_func, callback_name)
if args is None:
args = ()
if kwargs is None:
kwargs = {}
timeout_ref = None
return_result = future is None
if self._is_own_thread:
#log.debug('We are running in our own thread')
# We're in our own thread. Is there a callback involved?
if cb_func is None:
# There isn't. This is easy then, just call the function and
# return the result. Since the function is synchronous, we
# ignore the timeout.
if return_result:
#log.debug('Running directly')
result = async_func(*args, **kwargs)
raise tornado.gen.Return(result)
else:
try:
result = async_func(*args, **kwargs)
future.set_result(result)
except Exception,e:
log.exception('Exception running %s',
async_func)
future.set_exception(e)
else:
#log.debug('Running via callback')
# The result (or an exception) will be placed in this future.
future = future or tornado.concurrent.Future()
# Create a timeout object if we're given a time limit.
if timeout is not None:
timeout_ref = self._schedule_timeout(timeout, future)
# The function takes a callback argument which provides the
# result we're after. We've been given the function that will
# rat through what was given to us and will provide the result.
# The first argument given to the callback is the future.
cb_func = self._get_callback(cb_func)
def _callback(*args, **kwargs):
log = self._get_log('_run_async._callback')
#log.debug('Got response args=%s, kwargs=%s',
# args, kwargs)
try:
cb_func(future, *args, **kwargs)
except TimeoutError:
timeout_ref = None
except Exception, e:
if not future.done():
future.set_exception(e)
else:
log.exception('Exception in callback')
kwargs = kwargs.copy()
kwargs[callback_name] = _callback
# Perform the action given...
#log.debug('Performing asynchronous operation')
try:
#log.debug('Executing %s(*%s, **%s)',
# async_func, args, kwargs)
async_func(*args, **kwargs)
except Exception, e:
if not return_result:
future.set_exception(e)
else:
raise
if timeout_ref is not None:
self._io_loop.remove_timeout(timeout_ref)
else:
assert self._io_thread.is_alive(), 'IO Thread is dead'
#log.debug('Scheduling to run in our own IOLoop (we are '\
# 'in a foreign thread; current=%s vs ours=%s)',
# threading.current_thread(), self._io_thread)
# We're in the wrong thread, so schedule this to take place in
# our own IOLoop. First, create a future to store this in.
future = future or tornado.concurrent.Future()
# Then call ourselves with the new future.
self._io_loop.add_callback(
self._run_async,
async_func=async_func,
args=args,
kwargs=kwargs,
timeout=timeout,
cb_func=cb_func,
callback_name=callback_name,
future=future)
#log.debug('Return result? %s', return_result)
if return_result:
# Wait for the result
#log.debug('Waiting for result')
result = yield future
if timeout_ref is not None:
self._io_loop.remove_timeout(timeout_ref)
#log.debug('Result = %s', result)
# Return the result
raise tornado.gen.Return(result)
else:
# Return nothing to prevent the IOLoop from waiting.
raise tornado.gen.Return(None)
except tornado.gen.Return:
raise
except:
log.exception('Failed to execute')
raise
def _get_callback(self, callback_ref, callback_args=None,
callback_kwargs=None):
'''
Return a callback function from the given reference.
'''
#log = self._get_log('_get_callback')
if callback_ref is None:
return None
if callback_args is None:
callback_args = ()
if callback_kwargs is None:
callback_kwargs = {}
if isinstance(callback_ref, tuple):
(callback_ioloop, callback_func) = callback_ref[0:2]
if len(callback_ref) > 2:
callback_args = callback_args + (callback_ref[2] or ())
if len(callback_ref) > 3:
callback_kwargs.update(callback_ref[3] or {})
else:
callback_ioloop = None
callback_func = callback_ref
if callback_ioloop is None:
callback_ioloop = self._io_loop
def _callback(*args, **kwargs):
log = self._get_log('_get_callback','_callback')
#log.debug('Called with\n\targs=%s,\n\tkwargs=%s', args, kwargs)
# TODO: why is this happening?
if (len(args) == 1) and isinstance(args[0], tuple):
args = args[0]
try:
cb_args = callback_args + args
cb_kwargs = callback_kwargs.copy()
cb_kwargs.update(kwargs)
if self._io_loop is callback_ioloop:
# We're running this ourselves
#log.debug('Calling function in own IOLoop')
cb_res = callback_func(*args, **kwargs)
if isinstance(cb_res, tornado.concurrent.Future):
#log.debug( 'Function is asynchronous, '\
# 'waiting for result')
@tornado.gen.coroutine
def _get_result(future):
#log.debug('Getting result')
try:
result = yield future
#log.debug('Callback result: %s', result)
except:
log.exception('Callback failed')
self._io_loop.add_future(cb_res, _get_result)
#else:
#log.debug('Callback result: %s', cb_res)
else:
#log.debug('Scheduling function in external IOLoop')
callback_ioloop.add_callback(
callback_func, *cb_args, **cb_kwargs)
#log.debug('Callback scheduled')
except:
log.exception('Callback failed: %s', callback_func)
#log.debug('Wrapping function %s in callback %s',
# callback_func, _callback)
return _callback
class AMQPChannel(AMQPObject):
'''
An abstraction for the AMQP Channel.
'''
def __init__(self, connection, channel_number):
'''
Initialise a new channel object.
'''
log = self._get_log('__init__')
log.debug('New channel')
self._connection = connection
self._channel_number= channel_number
self._channel = None
# Queues and exchanges to re-establish
self._exchanges = []
self._queues = []
_amqp_connection = property(lambda s : \
s._connection._amqp_connection)
def _mark_stale(self):
self._get_log('_mark_stale').debug(
'Channel is now stale')
self._channel = None
@tornado.gen.coroutine
def _init_channel(self, timeout=None):
log = self._get_log('channel')
def _callback(future, channel):
log.debug('Opened new channel %s', channel)
if not future.done():
future.set_result(channel)
else:
log.error('Future already done, cannot return channel')
channel.close()
log.debug('Opening new channel (number=%s)', self._channel_number)
try:
channel = yield self._connection._run_async(
async_func = self._amqp_connection.channel,
args = None,
kwargs = {'channel_number': self._channel_number},
timeout = timeout,
cb_func = _callback,
callback_name = 'on_open_callback',
future = None)
except:
log.exception('Failed to open channel')
raise
self._channel = channel
# If we had any exchanges, declare those now
for ex in self._get_exchanges():
try:
log.debug('Declaring exchange %s', ex.exchange)
yield ex.declare(timeout)
except:
log.exception('Failed to declare exchange %s', ex.exchange)
# If we had any queues, declare those now
for q in self._get_queues():
try:
log.debug('Declaring queue %s', q.routing_key)
yield q.declare(timeout)
except:
log.exception('Failed to declare exchange %s', q.routing_key)
# ------------------------------------------------------------------------
# Channel status
# ------------------------------------------------------------------------
is_closed = property(lambda s : s._channel.is_closed)
is_closing = property(lambda s : s._channel.is_closing)
is_open = property(lambda s : s._channel.is_open)
# ------------------------------------------------------------------------
# Channel callbacks
# ------------------------------------------------------------------------
def add_callback(self, callback, replies, one_shot=True):
return self._connection._run_async(
async_func=self._channel.add_callback,
args=(self._connection._get_callback(callback),
replies, one_shot),
kwargs={}, timeout=None, callback=None, future=None)
def add_on_cancel_callback(self, callback):
return self._connection._run_async(
self._channel.add_on_cancel_callback,
args=(self._connection._get_callback(callback),))
def add_on_close_callback(self, callback):
return self._connection._run_async(
self._channel.add_on_close_callback,
args=(self._connection._get_callback(callback),))
def add_on_flow_callback(self, callback):
return self._connection._run_async(
self._channel.add_on_flow_callback,
args=(self._connection._get_callback(callback),))
def add_on_return_callback(self, callback):
return self._connection._run_async(
self._channel.add_on_return_callback,
args=(self._connection._get_callback(callback),))
# ------------------------------------------------------------------------
# Channel operations
# ------------------------------------------------------------------------
def basic_ack(self, delivery_tag=0, multiple=False):
return self._connection._run_async(
self._channel.basic_ack, args=(delivery_tag, multiple))
def basic_cancel(self, callback=None, consumer_tag='', nowait=False):
return self._connection._run_async(
self._channel.basic_cancel,
args=(self._connection._get_callback(callback),
consumer_tag, nowait))
def basic_consume(self, consumer_callback, queue='', no_ack=False,
exclusive=False, consumer_tag=None, arguments=None):
return self._connection._run_async(
self._channel.basic_consume,
args=(self._connection._get_callback(consumer_callback),
queue, no_ack, exclusive, consumer_tag, arguments))
def basic_get(self, callback, queue='', no_ack=False):
return self._connection._run_async(
self._channel.basic_get,
args=(self._connection._get_callback(callback),
queue, no_ack))
def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
return self._connection._run_async(
self._channel.basic_nack,
args=(delivery_tag, multiple, requeue))
def basic_publish(self, exchange, routing_key, body, properties=None,
mandatory=False, immediate=False):
return self._connection._run_async(
self._channel.basic_publish,
args=(exchange, routing_key, body, properties,
mandatory, immediate))
def basic_qos(self, callback=None, prefetch_size=0, prefetch_count=0,
all_channels=False):
return self._connection._run_async(
self._channel.basic_qos,
args=(self._connection._get_callback(callback),
prefetch_size, prefetch_count, all_channels))
def basic_reject(self, delivery_tag=None, requeue=True):
return self._connection._run_async(
self._channel.basic_reject,
args=(delivery_tag, requeue))
def basic_recover(self, callback=None, prefetch_size=0, prefetch_count=0,
all_channels=False):
return self._connection._run_async(
self._channel.basic_qos,
args=(self._connection._get_callback(callback),
prefetch_size, prefetch_count, all_channels))
def close(self, *args, **kwargs):
return self._connection._run_async(
self._channel.close,
args=args, kwargs=kwargs)
def confirm_delivery(self, callback=None, nowait=False):
return self._connection._run_async(
self._channel.confirm_delivery,
args=(self._connection._get_callback(callback), nowait))
def open(self):
return self._connection._run_async(self._channel.open)
# ------------------------------------------------------------------------
# Exchange operations
# ------------------------------------------------------------------------
def _cleanup_exchanges(self):
self._exchanges = filter(lambda er : er() is not None, self._exchanges)
def _get_exchanges(self):
return [er() for er in self._exchanges]
@tornado.gen.coroutine
def exchange(self, exchange=None, exchange_type='direct',
passive=False, durable=False, auto_delete=False, internal=False,
nowait=False, arguments=None, timeout=None):
e = AMQPExchange(self, exchange, exchange_type, passive, durable,
auto_delete, internal, nowait, arguments)
yield e.declare(timeout)
self._cleanup_exchanges()
self._exchanges.append(weakref.ref(e))
raise tornado.gen.Return(e)
def exchange_bind(self, destination=None, source=None,
routing_key='', nowait=False, arguments=None,
timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.exchange_bind,
args=(destination, source, routing_key, nowait, arguments),
cb_func=_get_response,
timeout=timeout)
def exchange_declare(self, exchange=None, exchange_type='direct',
passive=False, durable=False, auto_delete=False, internal=False,
nowait=False, arguments=None, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.exchange_declare,
kwargs={
'exchange': exchange,
'exchange_type': exchange_type,
'passive': passive,
'durable': durable,
'auto_delete': auto_delete,
'internal': internal,
'nowait': nowait,
'arguments': arguments,
},
cb_func=_get_response,
timeout=timeout)
def exchange_delete(self, exchange=None, if_unused=False, nowait=False,
timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.exchange_delete,
kwargs={
'exchange': exchange,
'if_unused': if_unused,
'nowait': nowait,
},
cb_func=_get_response,
timeout=timeout)
def exchange_unbind(self, destination=None, source=None,
routing_key='', nowait=False, arguments=None,
timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.exchange_unbind,
kwargs={
'destination': destination,
'source': source,
'routing_key': routing_key,
'nowait': nowait,
'arguments': arguments
},
cb_func=_get_response,
timeout=timeout)
# ------------------------------------------------------------------------
# Flow operations
# ------------------------------------------------------------------------
def flow(self, callback, active):
return self._connection._run_async(
self._channel.flow,
args=(self._connection._get_callback(callback), active))
# ------------------------------------------------------------------------
# Queue operations
# ------------------------------------------------------------------------
def _cleanup_queues(self):
self._queues = filter(lambda qr : qr() is not None, self._queues)
def _get_queues(self):
return [qr() for qr in self._queues]
@tornado.gen.coroutine
def queue(self, queue=None, passive=False, durable=False,
exclusive=False, auto_delete=False, nowait=False,
arguments=None, timeout=None):
q = AMQPQueue(self, queue, passive, durable, exclusive, auto_delete,
nowait, arguments)
yield q.declare(timeout)
self._cleanup_queues()
self._queues.append(weakref.ref(q))
raise tornado.gen.Return(q)
def queue_bind(self, queue, exchange, routing_key=None,
nowait=False, arguments=None, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.queue_bind,
kwargs={
'queue': queue,
'exchange': exchange,
'routing_key': routing_key,
'nowait': nowait,
'arguments': arguments,
},
cb_func=_get_response,
timeout=timeout)
def queue_declare(self, queue='', passive=False, durable=False,
exclusive=False, auto_delete=False, nowait=False,
arguments=None, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.queue_declare,
kwargs={
'queue': queue,
'passive': passive,
'durable': durable,
'exclusive': exclusive,
'auto_delete': auto_delete,
'nowait': nowait,
'arguments': arguments,
},
cb_func=_get_response,
timeout=timeout)
def queue_delete(self, queue='', if_unused=False,
if_empty=False, nowait=False, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.queue_delete,
kwargs={
'queue': queue,
'if_unused': if_unused,
'if_empty': if_empty,
'nowait': nowait
},
cb_func=_get_response,
timeout=timeout)
def queue_purge(self, queue, nowait=False, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.queue_purge,
kwargs={
'queue': queue,
'nowait': nowait
},
cb_func=_get_response,
timeout=timeout)
def queue_unbind(self, queue='', exchange=None,
routing_key=None, arguments=None, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.queue_unbind,
kwargs={
'queue': queue,
'exchange': exchange,
'routing_key': routing_key,
'arguments': arguments
},
cb_func=_get_response,
timeout=timeout)
# ------------------------------------------------------------------------
# Transaction operations
# ------------------------------------------------------------------------
def tx_commit(self, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.tx_commit,
cb_func=_get_response,
timeout=timeout)
def tx_rollback(self, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.tx_rollback,
cb_func=_get_response,
timeout=timeout)
def tx_select(self, timeout=None):
def _get_response(future, response):
future.set_result(response)
return self._connection._run_async(
self._channel.tx_select,
cb_func=_get_response,
timeout=timeout)
class AMQPMessageDestination(AMQPObject):
'''
A AMQPMessageDestination object is a base class for Queues and Exchanges:
places you can send messages to. This base class handles the transmission
of a message.
'''
def __init__(self, channel, exchange, routing_key):
self._channel = channel
self._exchange = exchange
self._routing_key = routing_key
self._declared = False
_connection = property(lambda s : s._channel._connection)
_amqp_connection = property(lambda s : s._channel._amqp_connection)
exchange = property(lambda s : s._exchange)
routing_key = property(lambda s : s._routing_key)
@tornado.gen.coroutine
def basic_publish(self, body, properties=None, mandatory=False,
immediate=False, exchange=None, routing_key=None):
if exchange is None:
exchange = self.exchange
if routing_key is None:
routing_key = self.routing_key
raise tornado.gen.Return((
yield self._channel.basic_publish(
exchange, routing_key, body, properties, mandatory,
immediate)
))
class AMQPQueue(AMQPMessageDestination):
'''
A representation of a message queue.
'''
def __init__(self, channel, queue=None, passive=False, durable=False,
exclusive=False, auto_delete=False, nowait=False, arguments=None):
super(AMQPQueue, self).__init__(channel, '', None)
self._queue = queue
self._passive = passive
self._durable = durable
self._exclusive = exclusive
self._auto_delete = auto_delete
self._nowait = nowait
self._arguments = arguments
# Our bindings to instate
# Format:
# {
# exchange_name: {
# routing_key: {
# args: kwargs for queue_bind,
# bound: boolean indicating bind state,
# }
# },
# }
self._bindings = {}
# A list of consumers
self._consumers = []
# ------------------------------------------------------------------------
# Queue operations
# ------------------------------------------------------------------------
@tornado.gen.coroutine
def declare(self, timeout=None):
log = self._get_log('declare')
# Are we declaring passively?
if self._passive:
# We are, declare using a temporary channel
log.debug('Using temporary channel')
ch = yield self._channel._connection.channel()
else:
# We're not, use our own channel
log.debug('Using own channel')
ch = self._channel
# Declare the queue in the channel
try:
log.debug('Declaring queue')
queue_res = yield ch.queue_declare(
queue = self._queue or '',
passive = self._passive,
durable = self._durable,
exclusive = self._exclusive,
auto_delete = self._auto_delete,
nowait = self._nowait,
arguments = self._arguments,
timeout = timeout)
self._routing_key = queue_res.method.queue
log.debug('Queue is %s', self._routing_key)
except:
log.exception('Failed to declare queue (queue=%s)', self._queue)
# If we opened a temporary channel, close it
if self._passive and ch.is_open:
yield ch.close()
# If we were bound to things, re-bind
for exchange, bindings in self._bindings.copy().iteritems():
for routing_key, state in bindings.copy().iteritems():
yield self.bind(
exchange = exchange,
routing_key = routing_key,
nowait = state['args']['nowait'],
arguments = state['args']['arguments'],
timeout = timeout)
# If we have any consumers, re-start those
for c in self._get_consumers():
yield c.consume(timeout)
@tornado.gen.coroutine
def bind(self, exchange, routing_key=None,
nowait=False, arguments=None, timeout=None):
log = self._get_log('bind')
if isinstance(exchange, AMQPExchange):
exchange = exchange.exchange
state = {
'args': {
'nowait': nowait,
'arguments': arguments,
},
'bound': False,
}
try:
log.debug('Binding to %s:%s', exchange, routing_key)
result = yield self._channel.queue_bind(
queue = self.routing_key,
exchange = exchange or '',
routing_key = routing_key,
nowait = nowait,
arguments = arguments,
timeout = timeout)
# We should be bound now.
state['bound'] = True
except:
log.debug('Failed to bind to %s:%s', exchange, routing_key)
if exchange not in self._bindings:
self._bindings[exchange] = {}
self._bindings[exchange][routing_key] = state
raise tornado.gen.Return(state['bound'])
@tornado.gen.coroutine
def delete(self, if_unused=False, if_empty=False, nowait=False,
timeout=None):
log = self._get_log('delete')
try:
log.debug('Deleting queue %s', self.routing_key)
yield self._channel.queue_delete(
queue = self.routing_key,
if_unused = if_unused,
if_empty = if_empty,
nowait = nowait,
timeout = timeout
)
# We're deleted
self._routing_key = None
except:
log.exception('Failed to delete queue %s', self.routing_key)
raise
@tornado.gen.coroutine
def purge(self, queue, nowait=False, timeout=None):
yield self._channel.queue_purge(
queue = self.routing_key,
nowait = nowait,
timeout = timeout)
@tornado.gen.coroutine
def unbind(self, exchange=None, routing_key=None,
arguments=None, timeout=None):
if isinstance(exchange, AMQPExchange):
exchange = exchange.exchange
yield self._channel.queue_unbind(
queue = self.routing_key,
exchange = exchange,
routing_key = routing_key,
arguments = arguments,
timeout = timeout)
# If we got here, then we're unbound
try:
del(self._bindings[exchange][routing_key])
if len(self._bindings[exchange]) == 0:
del(self._bindings[exchange])
except KeyError:
pass
def _cleanup_consumers(self):
self._consumers = filter(lambda cr : cr() is not None, self._consumers)
def _get_consumers(self):
return [cr() for cr in self._consumers]
@tornado.gen.coroutine
def consume(self, consumer_callback, no_ack=False, exclusive=False,
consumer_tag=None, arguments=None, channel=None, timeout=None):
c = AMQPConsumer(channel or self._channel, consumer_callback, self,
no_ack, exclusive, consumer_tag, arguments)
yield c.consume(timeout)
self._cleanup_consumers()
self._consumers.append(weakref.ref(c))
raise tornado.gen.Return(c)
class AMQPExchange(AMQPMessageDestination):
'''
A representation of a message exchange.
'''
def __init__(self, channel, exchange=None, exchange_type='direct',
passive=False, durable=False, auto_delete=False, internal=False,
nowait=False, arguments=None):
super(AMQPExchange, self).__init__(channel, exchange, None)
self._exchange_type = exchange_type
self._passive = passive
self._durable = durable
self._auto_delete = auto_delete
self._internal = internal
self._nowait = nowait
self._arguments = arguments
self._declared = False
# Our bindings to instate
# Format:
# {
# exchange_name: {
# routing_key: {
# args: kwargs for queue_bind,
# bound: boolean indicating bind state,
# }
# },
# }
self._bindings = {}
# ------------------------------------------------------------------------
# Exchange operations
# ------------------------------------------------------------------------
@tornado.gen.coroutine
def declare(self, timeout=None):
log = self._get_log('declare')
# Are we declaring passively?
if self._passive:
# We are, declare using a temporary channel
log.debug('Using temporary channel')
ch = yield self._channel._connection.channel()
else:
# We're not, use our own channel
log.debug('Using own channel')
ch = self._channel
# Declare the queue in the channel
try:
log.debug('Declaring exchange')
queue_res = yield ch.exchange_declare(
exchange = self._exchange,
exchange_type = self._exchange_type,
passive = self._passive,
durable = self._durable,
auto_delete = self._auto_delete,
internal = self._internal,
nowait = self._nowait,
arguments = self._arguments,
timeout = timeout)
self._declared = True
log.debug('Exchange declared')
except:
log.exception('Failed to declare exchange %s', self._exchange)
# If we opened a temporary channel, close it
if self._passive and ch.is_open:
yield ch.close()
# If we were bound to things, re-bind
for exchange, bindings in self._bindings.copy().iteritems():
for routing_key, state in bindings.copy().iteritems():
yield self.bind(
exchange = exchange,
routing_key = routing_key,
nowait = state['args']['nowait'],
arguments = state['args']['arguments'],
timeout = timeout)
@tornado.gen.coroutine
def bind(self, exchange, routing_key=None,
nowait=False, arguments=None, timeout=None):
log = self._get_log('bind')
if isinstance(exchange, AMQPExchange):
exchange = exchange.exchange
state = {
'args': {
'nowait': nowait,
'arguments': arguments,
},
'bound': False,
}
try:
log.debug('Binding to %s:%s', exchange, routing_key)
result = yield self._channel.queue_bind(
destination = self.exchange,
source = exchange,
routing_key = routing_key or '',
nowait = nowait,
arguments = arguments,
timeout = timeout)
# We should be bound now.
state['bound'] = True
except:
log.debug('Failed to bind to %s:%s', exchange, routing_key)
if exchange not in self._bindings:
self._bindings[exchange] = {}
self._bindings[exchange][routing_key] = state
raise tornado.gen.Return(state['bound'])
@tornado.gen.coroutine
def delete(self, if_unused=False, nowait=False, timeout=None):
log = self._get_log('delete')
try:
log.debug('Deleting exchange %s', self.exchange)
yield self._channel.exchange_delete(
exchange = self.exchange,
if_unused = if_unused,
nowait = nowait,
timeout = timeout
)
self._declared = False
except:
log.exception('Failed to delete exchange %s', self.exchange)
raise
@tornado.gen.coroutine
def unbind(self, exchange=None, routing_key=None,
arguments=None, timeout=None):
if isinstance(exchange, AMQPExchange):
exchange = exchange.exchange
yield self._channel.exchange_unbind(
destination = self.exchange,
source = exchange,
routing_key = routing_key,
arguments = arguments,
timeout = timeout)
# If we got here, then we're unbound
try:
del(self._bindings[exchange][routing_key])
if len(self._bindings[exchange]) == 0:
del(self._bindings[exchange])
except KeyError:
pass
class AMQPConsumer(AMQPObject):
'''
An abstraction for a consumer.
'''
def __init__(self, channel, consumer_callback, queue=None, no_ack=False,
exclusive=False, consumer_tag=None, arguments=None):
self._channel = channel
self._consumer_callback = consumer_callback
self._queue = queue
self._no_ack = no_ack
self._exclusive = exclusive
self._consumer_tag_given= consumer_tag
self._arguments = arguments
self._consumer_tag = None
consumer_tag = property(lambda s : s._consumer_tag)
@tornado.gen.coroutine
def consume(self, timeout=None):
log = self._get_log('_consume')
try:
log.debug('Consuming queue %s', self._queue.routing_key)
result = yield self._channel.basic_consume(
consumer_callback = self._consumer_callback,
queue = self._queue.routing_key,
no_ack = self._no_ack,
exclusive = self._exclusive,
consumer_tag = self._consumer_tag_given,
arguments = self._arguments)
self._consumer_tag = result
log.debug('Consumer tag is %s', self._consumer_tag)
except:
log.exception('Failed to consume %s', self._queue.routing_key)
self._consumer_tag = None
raise
@tornado.gen.coroutine
def cancel(self, nowait=False, arguments=None, timeout=None):
log = self._get_log('cancel')
try:
log.debug('Cancelling consumer of queue %s', self._queue.routing_key)
yield self._channel.basic_cancel(
consumer_tag = self._consumer_tag_given,
nowait = nowait,
arguments = arguments)
self._consumer_tag = None
log.debug('Consumer is cancelled')
except:
log.exception('Failed to cancel consuming %s',
self._queue.routing_key)
raise
class TimeoutError(Exception):
'''
Exception raised when an operation times out.
'''
pass
if __name__ == '__main__':
# The following is a simple example that tries to
# create two channels (transmit and receive), declare an exchange,
# bind a new queue to it, then send a message to the exchange.
logging.basicConfig(
level=logging.DEBUG,
format= '%(asctime)s %(name)s[%(filename)s:%(lineno)4d] '\
'%(process)d/%(threadName)s %(levelname)s %(message)s')
io_loop = tornado.ioloop.IOLoop.instance()
msg_received = tornado.concurrent.Future()
@tornado.gen.coroutine
def receive(channel, method, properties, body):
logging.info(
('\n' + ('-'*80) +
'\nReceived\n\tchannel=%s,\n\tmethod=%s,'\
'\n\tproperties=%s, body=%s\n' +
('-'*80)),
channel, method, properties, body)
msg_received.set_result(True)
raise tornado.gen.Return(None)
@tornado.gen.coroutine
def on_connect(*args, **kwargs):
try:
logging.info('Opening a transmit channel')
tx_ch = yield amqp_conn.channel()
logging.debug('tx_ch = %s', tx_ch)
logging.info('Opening a receive channel')
rx_ch = yield amqp_conn.channel()
logging.info('Declaring a test exchange on the receive channel')
ex = yield rx_ch.exchange_declare(exchange='test',
exchange_type='fanout')
logging.info('Declaring a receive queue on the receive channel')
rx_q = yield rx_ch.queue_declare(exclusive=True)
logging.info('Binding the queue to the exchange')
bind = yield rx_ch.queue_bind( queue=rx_q.method.queue,
exchange='test', routing_key='#')
logging.info('Beginning consumation of queue')
consume = yield rx_ch.basic_consume(
consumer_callback = receive,
queue = rx_q.method.queue,
no_ack=True)
logging.info('Transmitting a test message')
tx = yield tx_ch.basic_publish(
exchange = 'test',
routing_key = 'test',
body = 'this is a test')
logging.info('Waiting to receive something')
yield msg_received
logging.info('Disconnecting')
yield amqp_conn.close()
logging.info('Stopping')
except:
logging.exception('on_connect failed')
io_loop.stop()
raise tornado.gen.Return(None)
amqp_conn = AMQPConnection(
pika.URLParameters('amqp://guest:guest@localhost:5672/%2f'),
io_loop=io_loop,
on_open_callback=on_connect)
io_loop.start()
@desertkun
Copy link

Hello! Found this code useful. Could you please tell if this code is production ready?

@desertkun
Copy link

Please note you have a mistake at line #301:
yield operator turns _handle_on_open_callback into a generator, and therefore breaks it.

I've fixed that in my fork.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment