Skip to content

Instantly share code, notes, and snippets.

@markmc
Created May 31, 2013 15:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save markmc/5685616 to your computer and use it in GitHub Desktop.
Save markmc/5685616 to your computer and use it in GitHub Desktop.
Playing with kombu and async processing
import kombu.connection
import kombu.entity
import kombu.messaging
params = {
'hostname': 'localhost',
'port': 5672,
'virtual_host': '/',
}
connection = kombu.connection.BrokerConnection(**params)
connection.connect()
channel = connection.channel()
exchange = kombu.entity.Exchange(name='kombu-test',
type='topic',
durable=False,
auto_delete=False)
producer = kombu.messaging.Producer(exchange=exchange,
channel=channel,
routing_key='topic1')
producer.publish('foo')
class Kombu(object):
def __init__(self):
self.params = {
'hostname': 'localhost',
'port': 5672,
'virtual_host': '/',
}
self.exchange_name = 'kombu-test'
self.topic = 'topic1'
def connect(self):
import kombu.connection
import kombu.entity
self.connection = kombu.connection.BrokerConnection(**self.params)
self.connection.connect()
self.channel = self.connection.channel()
exchange = kombu.entity.Exchange(name=self.exchange_name,
type='topic',
durable=False,
auto_delete=False)
queue = kombu.entity.Queue(name=self.topic,
exchange=exchange,
routing_key=self.topic,
channel=self.channel,
durable=False,
auto_delete=False,
exclusive=False)
queue.declare()
queue.consume(consumer_tag=str(1),
nowait=False,
callback=self.callback)
def callback(self, raw_message):
message = self.channel.message_to_python(raw_message)
print message.payload
message.ack()
@property
def sock(self):
return self.connection.connection.sock
def process_pending(self):
self.connection.drain_events(timeout=0)
import select
import socket
import errno
k = Kombu()
k.connect()
p = select.poll()
p.register(k.sock, select.POLLIN|select.POLLPRI)
while True:
try:
k.process_pending()
except socket.error as e:
if e.errno != errno.EAGAIN:
raise e
print "polling"
p.poll()
print "poll returned"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment