Skip to content

Instantly share code, notes, and snippets.

@deleted
Created October 2, 2009 21:37
Show Gist options
  • Save deleted/200170 to your computer and use it in GitHub Desktop.
Save deleted/200170 to your computer and use it in GitHub Desktop.
from amqplib import client_0_8 as amqp
from amq_config import connection_params
import logging
logger = logging.getLogger()
DEFAULT_EXCHANGE = 'ngt.direct'
# Open the connection to RabitMQ
connection = ampq.Connection(**connection_params)
class MessageBus(object):
def __init__(self, **kwargs):
self._conn = connection
self._chan = self._conn.channel()
def __del__(self):
self._chan.close()
#self._conn.close()
@property
def connection(self):
return self._conn
@property
def channel(self):
return self._chan
def publish(self, msg, exchange=DEFAULT_EXCHANGE, routing_key=None):
msg = amqp.Message(msg)
msg.properties["delivery_mode"] = 2 # Sets as persistent
self._chan.basic_publish(msg,exchange=exchange,routing_key=routing_key)
def setup_direct_queue(self, queue, exchange=DEFAULT_EXCHANGE, routing_key=None, chan=None):
'''Simplifies the job of creating a directly-routed queue.'''
if not routing_key:
routing_key = queue
if not chan:
chan = self._chan
chan.exchange_declare(exchange=exchange, type="direct", durable=True, auto_delete=False,)
chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False)
chan.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)
logger.debug("MessageBus (Ch#%d): Bound queue '%s' to exchange '%s' with routing key '%s'." % (chan.channel_id, queue, exchange, routing_key) )
def register_consumer(self, queuename, callback, exchange=DEFAULT_EXCHANGE, routing_key=None, chan=None):
'''
Declare a direct queue and attach a consumer to it. This assumes an exchange has already beed created.
By default, the routing key will be the same as the queue name.
Returns the consumer tag.
'''
if not routing_key:
routing_key = queuename
if not chan:
chan = self._chan
chan.queue_declare(queue=queuename, durable=True, auto_delete=False)
chan.queue_bind(queue=queuename, exchange=exchange, routing_key=routing_key)
logger.debug("MessageBus (Ch#%d): Bound queue '%s' to exchange '%s' with routing key '%s'." % (chan.channel_id, queuename, exchange, routing_key) )
logger.debug("MessageBus (Ch#%d): %s will consume from queue '%s'" % (chan.channel_id, str(callback), queuename))
return chan.basic_consume(callback=callback, queue=queuename)
def wait(self):
self._chan.wait()
def ack(self, *args, **kwargs):
self._chan.basic_ack(*args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment