Skip to content

Instantly share code, notes, and snippets.

Created March 9, 2015 22:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/a0796eed5d29cdadac8b to your computer and use it in GitHub Desktop.
Save anonymous/a0796eed5d29cdadac8b to your computer and use it in GitHub Desktop.
circuits: test custom poller
from circuits import BaseComponent, Component, Event, handler, Manager
import msgpack
import redis
class msg(Event):
"msg Event"""
class Hades(Component):
def started(self, *args):
print 'hades: starting...'
def stopped(self, *args):
print 'hades: stopping...'
def msg(self, *args):
print 'hades: msg event...'
class Hekla(BaseComponent):
def __init__(self):
super(Hekla, self).__init__()
self.redis = redis.StrictRedis()
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
self.pubsub.subscribe('test.fw')
@handler("generate_events")
def _on_generate_events(self, event):
message = self.pubsub.get_message()
if message:
data = msgpack.unpackb(message['data'])
self.fire(msg(data))
manager = Manager()
Hades().register(manager)
Hekla().register(manager)
manager.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment