Created
March 9, 2015 22:54
-
-
Save anonymous/a0796eed5d29cdadac8b to your computer and use it in GitHub Desktop.
circuits: test custom poller
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from circuits import BaseComponent, Component, Event, handler, Manager | |
import msgpack | |
import redis | |
class msg(Event): | |
"msg Event""" | |
class Hades(Component): | |
def started(self, *args): | |
print 'hades: starting...' | |
def stopped(self, *args): | |
print 'hades: stopping...' | |
def msg(self, *args): | |
print 'hades: msg event...' | |
class Hekla(BaseComponent): | |
def __init__(self): | |
super(Hekla, self).__init__() | |
self.redis = redis.StrictRedis() | |
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) | |
self.pubsub.subscribe('test.fw') | |
@handler("generate_events") | |
def _on_generate_events(self, event): | |
message = self.pubsub.get_message() | |
if message: | |
data = msgpack.unpackb(message['data']) | |
self.fire(msg(data)) | |
manager = Manager() | |
Hades().register(manager) | |
Hekla().register(manager) | |
manager.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment