Created
May 31, 2013 15:07
-
-
Save markmc/5685616 to your computer and use it in GitHub Desktop.
Playing with kombu and async processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kombu.connection | |
import kombu.entity | |
import kombu.messaging | |
params = { | |
'hostname': 'localhost', | |
'port': 5672, | |
'virtual_host': '/', | |
} | |
connection = kombu.connection.BrokerConnection(**params) | |
connection.connect() | |
channel = connection.channel() | |
exchange = kombu.entity.Exchange(name='kombu-test', | |
type='topic', | |
durable=False, | |
auto_delete=False) | |
producer = kombu.messaging.Producer(exchange=exchange, | |
channel=channel, | |
routing_key='topic1') | |
producer.publish('foo') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Kombu(object): | |
def __init__(self): | |
self.params = { | |
'hostname': 'localhost', | |
'port': 5672, | |
'virtual_host': '/', | |
} | |
self.exchange_name = 'kombu-test' | |
self.topic = 'topic1' | |
def connect(self): | |
import kombu.connection | |
import kombu.entity | |
self.connection = kombu.connection.BrokerConnection(**self.params) | |
self.connection.connect() | |
self.channel = self.connection.channel() | |
exchange = kombu.entity.Exchange(name=self.exchange_name, | |
type='topic', | |
durable=False, | |
auto_delete=False) | |
queue = kombu.entity.Queue(name=self.topic, | |
exchange=exchange, | |
routing_key=self.topic, | |
channel=self.channel, | |
durable=False, | |
auto_delete=False, | |
exclusive=False) | |
queue.declare() | |
queue.consume(consumer_tag=str(1), | |
nowait=False, | |
callback=self.callback) | |
def callback(self, raw_message): | |
message = self.channel.message_to_python(raw_message) | |
print message.payload | |
message.ack() | |
@property | |
def sock(self): | |
return self.connection.connection.sock | |
def process_pending(self): | |
self.connection.drain_events(timeout=0) | |
import select | |
import socket | |
import errno | |
k = Kombu() | |
k.connect() | |
p = select.poll() | |
p.register(k.sock, select.POLLIN|select.POLLPRI) | |
while True: | |
try: | |
k.process_pending() | |
except socket.error as e: | |
if e.errno != errno.EAGAIN: | |
raise e | |
print "polling" | |
p.poll() | |
print "poll returned" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment