Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
class Kombu(object):
def __init__(self):
self.params = {
'hostname': 'localhost',
'port': 5672,
'virtual_host': '/',
'userid': 'guest',
'password': 'badpassword', # 'guest'
}
self.exchange_name = 'nova'
self.topic = 'notifications.info'
def connect(self):
import kombu.connection
import kombu.entity
self.connection = kombu.connection.BrokerConnection(**self.params)
self.connection.connect()
self.channel = self.connection.channel()
exchange = kombu.entity.Exchange(name=self.exchange_name,
type='topic',
durable=False,
auto_delete=False)
queue = kombu.entity.Queue(name=self.topic,
exchange=exchange,
routing_key=self.topic,
channel=self.channel,
durable=False,
auto_delete=False,
exclusive=False)
queue.declare()
queue.consume(consumer_tag=str(1),
nowait=False,
callback=self.callback)
def callback(self, raw_message):
message = self.channel.message_to_python(raw_message)
print message.payload
message.ack()
@property
def sock(self):
return self.connection.connection.sock
def process_pending(self):
self.connection.drain_events(timeout=0)
import select
import socket
import errno
k = Kombu()
k.connect()
p = select.poll()
p.register(k.sock, select.POLLIN|select.POLLPRI)
while True:
try:
k.process_pending()
except socket.error as e:
if e.errno != errno.EAGAIN:
raise e
p.poll()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment