Created
October 2, 2009 21:37
-
-
Save deleted/200170 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from amqplib import client_0_8 as amqp | |
from amq_config import connection_params | |
import logging | |
logger = logging.getLogger() | |
DEFAULT_EXCHANGE = 'ngt.direct' | |
# Open the connection to RabitMQ | |
connection = ampq.Connection(**connection_params) | |
class MessageBus(object): | |
def __init__(self, **kwargs): | |
self._conn = connection | |
self._chan = self._conn.channel() | |
def __del__(self): | |
self._chan.close() | |
#self._conn.close() | |
@property | |
def connection(self): | |
return self._conn | |
@property | |
def channel(self): | |
return self._chan | |
def publish(self, msg, exchange=DEFAULT_EXCHANGE, routing_key=None): | |
msg = amqp.Message(msg) | |
msg.properties["delivery_mode"] = 2 # Sets as persistent | |
self._chan.basic_publish(msg,exchange=exchange,routing_key=routing_key) | |
def setup_direct_queue(self, queue, exchange=DEFAULT_EXCHANGE, routing_key=None, chan=None): | |
'''Simplifies the job of creating a directly-routed queue.''' | |
if not routing_key: | |
routing_key = queue | |
if not chan: | |
chan = self._chan | |
chan.exchange_declare(exchange=exchange, type="direct", durable=True, auto_delete=False,) | |
chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False) | |
chan.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key) | |
logger.debug("MessageBus (Ch#%d): Bound queue '%s' to exchange '%s' with routing key '%s'." % (chan.channel_id, queue, exchange, routing_key) ) | |
def register_consumer(self, queuename, callback, exchange=DEFAULT_EXCHANGE, routing_key=None, chan=None): | |
''' | |
Declare a direct queue and attach a consumer to it. This assumes an exchange has already beed created. | |
By default, the routing key will be the same as the queue name. | |
Returns the consumer tag. | |
''' | |
if not routing_key: | |
routing_key = queuename | |
if not chan: | |
chan = self._chan | |
chan.queue_declare(queue=queuename, durable=True, auto_delete=False) | |
chan.queue_bind(queue=queuename, exchange=exchange, routing_key=routing_key) | |
logger.debug("MessageBus (Ch#%d): Bound queue '%s' to exchange '%s' with routing key '%s'." % (chan.channel_id, queuename, exchange, routing_key) ) | |
logger.debug("MessageBus (Ch#%d): %s will consume from queue '%s'" % (chan.channel_id, str(callback), queuename)) | |
return chan.basic_consume(callback=callback, queue=queuename) | |
def wait(self): | |
self._chan.wait() | |
def ack(self, *args, **kwargs): | |
self._chan.basic_ack(*args, **kwargs) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment