Skip to content

Instantly share code, notes, and snippets.

@philipcristiano
Last active January 3, 2019 03:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save philipcristiano/4627474 to your computer and use it in GitHub Desktop.
Save philipcristiano/4627474 to your computer and use it in GitHub Desktop.
Simple Tornado/Pika wrapper. Part of an internal library so it raises an error on init instead of importing
try:
import pika
from pika.adapters.tornado_connection import TornadoConnection
except ImportError:
pika = None
try:
import tornado
import tornado.ioloop
except ImportError:
tornado = None
class TornadoQueueConnection(object):
def __init__(self, host, user='guest', password='guest', vhost='/'):
if tornado is None:
raise Exception('You must add tornado to your requirements!')
if pika is None:
raise Exception('You must add pika to your requirements!')
self._parameters = pika.ConnectionParameters(
host=host,
credentials=pika.PlainCredentials(user, password),
virtual_host=vhost
)
self._connection = None
self._channel = None
self.ioloop = tornado.ioloop.IOLoop.instance()
self.ioloop.add_timeout(0, self._connect)
self._delivery_tag = 0
self._confirmation_callbacks = {}
def publish(self, exchange, routing_key, headers, body, callback):
properties = pika.BasicProperties(content_type='text/plain')
if self._connection is None or self._connection.is_closed:
self._connect()
callback(False)
if self._channel is None or self._channel.is_closed:
self._open_channel()
callback(False)
self._channel.basic_publish(exchange, routing_key, body, properties)
self._delivery_tag += 1
self._confirmation_callbacks[self._delivery_tag] = callback
def publish_json(self, exchange, routing_key, headers, body, callback):
data = ujson.dumps(body)
self.publish(exchange, routing_key, headers, data, callback)
def _on_delivery_confirmation(self, method_frame):
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
tag = method_frame.method.delivery_tag
if confirmation_type == 'ack':
success = True
else:
success = False
callback = self._confirmation_callbacks[tag]
del self._confirmation_callbacks[tag]
callback(success)
def close(self):
self._connection.close()
def _connect(self):
self.connection = TornadoConnection(
self._parameters,
on_open_callback=self._on_connected,
stop_ioloop_on_close=False,
)
def _on_connected(self, connection):
self._connection = connection
self._connection.add_on_close_callback(self._on_connection_closed)
self._open_channel()
def _on_connection_closed(self, method_frame):
self._connection = None
self._connect()
def _open_channel(self):
self.connection.channel(self._on_channel_open)
def _on_channel_open(self, channel):
self._channel = channel
self._channel.confirm_delivery(self._on_delivery_confirmation)
@gaufung
Copy link

gaufung commented Jan 3, 2019

It seems that it doesn't work as expected

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment