Skip to content

Instantly share code, notes, and snippets.

@nathanborror
Created May 10, 2009 06:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save nathanborror/109521 to your computer and use it in GitHub Desktop.
Save nathanborror/109521 to your computer and use it in GitHub Desktop.
A very simple way to interact with python amqplib
"""
Flopsy
======
A very simple way to interact with python AMQPlib. For my case I'm using
RabbitMQ as my implementation but it should work with others.
settings.py
-----------
AMQP_SERVER = '24.143.38.241'
AMQP_PORT = 5672
AMQP_USER = 'guest'
AMQP_PASSWORD = 'guest'
AMQP_VHOST = '/'
Consumer
--------
>>> from flopsy import Connection, Consumer
>>> consumer = Consumer(connection=Connection())
>>> consumer.declare(queue='po_box', exchange='sorting_room', routing_key='jason', auto_delete=False)
>>> def message_callback(message):
... print 'Recieved: ' + message.body
... consumer.channel.basic_ack(message.delivery_tag)
>>>
>>> consumer.register(message_callback)
>>> consumer.wait()
Publisher
---------
>>> from flopsy import Connection, Publisher
>>> publisher = Publisher(connection=Connection(), exchange='sorting_room', routing_key='jason')
>>> publisher.publish('Test message!')
>>> publisher.close()
"""
from django.conf import settings
from amqplib import client_0_8 as amqp
class Connection(object):
def __init__(self, *args, **kwargs):
self.host = kwargs.get('host', getattr(settings, 'AMQP_SERVER'))
self.user_id = kwargs.get('user_id', getattr(settings, 'AMQP_USER'))
self.password = kwargs.get('password', getattr(settings, 'AMQP_PASSWORD'))
self.vhost = kwargs.get('vhost', getattr(settings, 'AMQP_VHOST', '/'))
self.port = kwargs.get('port', getattr(settings, 'AMQP_PORT', 5672))
self.insist = False
self.connect()
def connect(self):
self.connection = amqp.Connection(host='%s:%s' % (self.host, self.port), userid=self.user_id,
password=self.password, virtual_host=self.vhost, insist=self.insist)
class Consumer(object):
def __init__(self, connection):
self.connection = connection
self.channel = self.connection.connection.channel()
def close(self):
if getattr(self, 'channel'):
self.channel.close()
if getattr(self, 'connection'):
self.connection.close()
def declare(self, queue, exchange, routing_key, durable=True, exclusive=False, auto_delete=False):
self.queue = queue
self.exchange = exchange
self.routing_key = routing_key
self.channel.queue_declare(queue=self.queue, durable=durable,
exclusive=exclusive, auto_delete=auto_delete)
self.channel.exchange_declare(exchange=self.exchange, type='direct',
durable=durable, auto_delete=auto_delete)
self.channel.queue_bind(queue=self.queue, exchange=self.exchange,
routing_key=self.routing_key)
def wait(self):
while True:
self.channel.wait()
def register(self, callback, queue=None, consumer_tag='flopsy_callback'):
if hasattr(self, 'queue') or queue:
self.consumer_tag = consumer_tag
self.channel.basic_consume(queue=getattr(self, 'queue', queue), no_ack=True,
callback=callback, consumer_tag=consumer_tag)
def unregister(self, consumer_tag='flopsy_callback'):
self.channel.basic_cancel(consumer_tag)
class Publisher(object):
def __init__(self, connection, exchange, routing_key, delivery_mode=2):
self.connection = connection
self.channel = self.connection.connection.channel()
self.exchange = exchange
self.routing_key = routing_key
self.delivery_mode = delivery_mode
def publish(self, message_data):
message = amqp.Message(message_data)
message.properties['delivery_mode'] = self.delivery_mode
self.channel.basic_publish(message, exchange=self.exchange, routing_key=self.routing_key)
return message
def close(self):
if getattr(self, 'channel'):
self.channel.close()
if getattr(self, 'connection'):
self.connection.connection.close()
@TrevorBurnham
Copy link

I'm wondering whether there's a way to wait for a single message with flopsy, without mucking with the underlying amqplib code. consumer.wait() seems to go into an infinite loop that can't be broken; is that right? Is there a way to just check for messages periodically?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment