Skip to content

Instantly share code, notes, and snippets.

@ask
Forked from ericflo/ericflopsy.py
Created June 7, 2009 20:36
Show Gist options
  • Save ask/125476 to your computer and use it in GitHub Desktop.
Save ask/125476 to your computer and use it in GitHub Desktop.
"""
Consumer
--------
>>> import ericflopsy
>>> conn = ericflopsy.Connection()
>>> consumer = ericflopsy.Consumer(connection=conn)
>>> def print_message(message_body, message):
... print 'Recieved: ' + message_body
... message.ack()
>>> consumer.register('messages_to_print', print_message)
>>> def print_message_in_caps(message_body, message):
... print 'Received: ' + message_body.upper()
... message.ack()
>>> consumer.register('messages_to_print_in_caps', print_message_in_caps)
>>> consumer.wait()
Publisher
---------
>>> import ericflopsy
>>> conn = ericflopsy.Connection()
>>> publisher = ericflopsy.Publisher(connection=conn)
>>> publisher.publish('messages_to_print', 'Test message!')
>>> publisher.publish('messages_to_print_in_caps', 'hello, world!')
"""
from carrot.messaging import Consumer as BaseConsumer
from carrot.messaging import Publisher as BasePublisher
from carrot.connection import DjangoAMQPConnection as Connection
# and voila. you have unit tests as well ;)
class Consumer(BaseConsumer):
named_callbacks = None
def __init__(self, *args, **kwargs):
super(NamedCallbackConsumer, self).__init__(*args, **kwargs)
self.named_callbacks = {}
def register(self, kind, callback):
self.named_callbacks[kind] = callback
def unregister(self, kind):
del self.named_callbacks[kind]
def receieve(self, message_data, message):
kind = message_data.get("kind")
data = message_data.get("data")
callback_for_kind = self.callbacks.get(kind)
if callback_for_kind:
callback_for_kind(data, message)
class Publisher(BasePublisher):
def publish(self, kind, message_data):
return self.send({"kind": kind, "data": message_data})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment