Skip to content

Instantly share code, notes, and snippets.

@gleicon
Created January 15, 2012 04:52
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save gleicon/1614390 to your computer and use it in GitHub Desktop.
Save gleicon/1614390 to your computer and use it in GitHub Desktop.
gevent+kombu actor/mailbox using decorators
from __future__ import with_statement
from kombu import BrokerConnection
from collections import defaultdict
import gevent
from gevent import monkey
monkey.patch_all()
class WorkerHub():
"""
WorkerHub controls the local mailboxes that the @worker decorator assigns.
Also contains a method to send messages to given workers
Depends on kombu + gevent, uses SimpleQueue. default transport is redis
"""
def __init__(self, transport_url = "redis://127.0.0.1:6379"):
self.workers = defaultdict(set)
self._transport_url = transport_url
self._connected = False
def add_worker(self, workername, callback):
self.workers[workername].add(callback)
def _listener():
while self._connected == False:
print "waiting %s" % self._connected
gevent.sleep(1)
with BrokerConnection(self._transport_url) as conn:
with conn.SimpleQueue(workername) as queue:
while True:
message = queue.get(block=True, timeout=10)
if message:
self._execute_callbacks(workername, message.payload)
message.ack()
if self._connected == False: break
gevent.spawn(_listener)
def _execute_callbacks(self, workername, message):
for w in self.workers[workername]:
w(message)
def send_message(self, workername, message):
with BrokerConnection(self._transport_url) as conn:
with conn.SimpleQueue(workername) as queue:
queue.put(message)
def connect(self, transport_url=None):
self._connected = True
print "connected: %s" % self._connected
if transport_url: self._transport_url = transport_url
def disconnect(self):
self._connected = False
wh = WorkerHub()
def start_workers(transport_url = None):
"""
start all workers. provide a different transport url following kombu URI
scheme.
if no transport is given, defaults to local redis.
start_workers()
"""
if transport_url is None:
transport_url = "redis://127.0.0.1:6379"
wh.connect(transport_url)
def stop_workers():
wh.disconnect()
def worker(workername):
"""
gevent/kombu based worker
prepend this decorator to a function that might receive the message.
broadcasts the message to all callbacks appended.
ex:
@worker("/queue")
def my_processor(message):
print message
"""
def _decorator(f):
wh.add_worker(workername, f)
return f
return _decorator
def send_message(workername, message):
"""
send a message to a given worker, using the same key as the worker
decorator.
send_message("/queue", "my message")
"""
wh.send_message(workername, message)
# test
@worker('/workers/delicious')
def a_worker(message):
print "received: %s" % message
#send_message('/workers/jazz', "no waaay saaaap")
@worker('/workers/jazz')
def another_worker(message):
print "received2: %s" % message
send_message('/workers/delicious', "saaaap")
@worker('/workers/delicious')
def third_worker(message):
print "the other worker received: %s" % message
if __name__ == "__main__":
start_workers()
gevent.sleep(10)
print "acabou"
stop_workers()
from __future__ import with_statement
from kombu.common import maybe_declare
from kombu.pools import producers
from kombu import BrokerConnection, Exchange, Queue
from kombu.pools import connections
transport_url = "redis://127.0.0.1:6379"
connection = BrokerConnection(transport_url)
_queue = "/workers/delicious"
_queue2 = "/workers/jazz"
if __name__ == "__main__":
with connections[connection].acquire(block=True) as conn:
with conn.SimpleQueue(_queue) as queue:
queue.put("mensagem")
with connections[connection].acquire(block=True) as conn:
with conn.SimpleQueue(_queue2) as queue:
queue.put("mensagem2")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment