Created
May 10, 2009 06:28
-
-
Save nathanborror/109521 to your computer and use it in GitHub Desktop.
A very simple way to interact with python amqplib
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Flopsy | |
====== | |
A very simple way to interact with python AMQPlib. For my case I'm using | |
RabbitMQ as my implementation but it should work with others. | |
settings.py | |
----------- | |
AMQP_SERVER = '24.143.38.241' | |
AMQP_PORT = 5672 | |
AMQP_USER = 'guest' | |
AMQP_PASSWORD = 'guest' | |
AMQP_VHOST = '/' | |
Consumer | |
-------- | |
>>> from flopsy import Connection, Consumer | |
>>> consumer = Consumer(connection=Connection()) | |
>>> consumer.declare(queue='po_box', exchange='sorting_room', routing_key='jason', auto_delete=False) | |
>>> def message_callback(message): | |
... print 'Recieved: ' + message.body | |
... consumer.channel.basic_ack(message.delivery_tag) | |
>>> | |
>>> consumer.register(message_callback) | |
>>> consumer.wait() | |
Publisher | |
--------- | |
>>> from flopsy import Connection, Publisher | |
>>> publisher = Publisher(connection=Connection(), exchange='sorting_room', routing_key='jason') | |
>>> publisher.publish('Test message!') | |
>>> publisher.close() | |
""" | |
from django.conf import settings | |
from amqplib import client_0_8 as amqp | |
class Connection(object): | |
def __init__(self, *args, **kwargs): | |
self.host = kwargs.get('host', getattr(settings, 'AMQP_SERVER')) | |
self.user_id = kwargs.get('user_id', getattr(settings, 'AMQP_USER')) | |
self.password = kwargs.get('password', getattr(settings, 'AMQP_PASSWORD')) | |
self.vhost = kwargs.get('vhost', getattr(settings, 'AMQP_VHOST', '/')) | |
self.port = kwargs.get('port', getattr(settings, 'AMQP_PORT', 5672)) | |
self.insist = False | |
self.connect() | |
def connect(self): | |
self.connection = amqp.Connection(host='%s:%s' % (self.host, self.port), userid=self.user_id, | |
password=self.password, virtual_host=self.vhost, insist=self.insist) | |
class Consumer(object): | |
def __init__(self, connection): | |
self.connection = connection | |
self.channel = self.connection.connection.channel() | |
def close(self): | |
if getattr(self, 'channel'): | |
self.channel.close() | |
if getattr(self, 'connection'): | |
self.connection.close() | |
def declare(self, queue, exchange, routing_key, durable=True, exclusive=False, auto_delete=False): | |
self.queue = queue | |
self.exchange = exchange | |
self.routing_key = routing_key | |
self.channel.queue_declare(queue=self.queue, durable=durable, | |
exclusive=exclusive, auto_delete=auto_delete) | |
self.channel.exchange_declare(exchange=self.exchange, type='direct', | |
durable=durable, auto_delete=auto_delete) | |
self.channel.queue_bind(queue=self.queue, exchange=self.exchange, | |
routing_key=self.routing_key) | |
def wait(self): | |
while True: | |
self.channel.wait() | |
def register(self, callback, queue=None, consumer_tag='flopsy_callback'): | |
if hasattr(self, 'queue') or queue: | |
self.consumer_tag = consumer_tag | |
self.channel.basic_consume(queue=getattr(self, 'queue', queue), no_ack=True, | |
callback=callback, consumer_tag=consumer_tag) | |
def unregister(self, consumer_tag='flopsy_callback'): | |
self.channel.basic_cancel(consumer_tag) | |
class Publisher(object): | |
def __init__(self, connection, exchange, routing_key, delivery_mode=2): | |
self.connection = connection | |
self.channel = self.connection.connection.channel() | |
self.exchange = exchange | |
self.routing_key = routing_key | |
self.delivery_mode = delivery_mode | |
def publish(self, message_data): | |
message = amqp.Message(message_data) | |
message.properties['delivery_mode'] = self.delivery_mode | |
self.channel.basic_publish(message, exchange=self.exchange, routing_key=self.routing_key) | |
return message | |
def close(self): | |
if getattr(self, 'channel'): | |
self.channel.close() | |
if getattr(self, 'connection'): | |
self.connection.connection.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'm wondering whether there's a way to wait for a single message with flopsy, without mucking with the underlying amqplib code.
consumer.wait()
seems to go into an infinite loop that can't be broken; is that right? Is there a way to just check for messages periodically?