While this started as a quick gist, I ended up developing it far enough to give it it's own repository and documentation.
Visit Shout's Repo!
While this started as a quick gist, I ended up developing it far enough to give it it's own repository and documentation.
Visit Shout's Repo!
''' | |
Shout | |
----- | |
A loud messaging module. | |
''' | |
import inspect | |
from collections import Sequence, defaultdict | |
import sys | |
ROOM_DEFL = "void" | |
class MetaMsg(type): | |
'''Metaclass adding a listeners dict allowing subclasses to keep | |
track of listeners and their methods.''' | |
def __new__(kls, name, bases, members): | |
cls = super(MetaMsg, kls).__new__(kls, name, bases, members) | |
cls.listeners = defaultdict(set) | |
return cls | |
class Message(object): | |
__metaclass__ = MetaMsg | |
def __init__(self, *args, **kwargs): | |
try: | |
self.room = kwargs.pop("to") | |
except KeyError: | |
self.room = ROOM_DEFL | |
self.args = args | |
self.kwargs = kwargs | |
self.results = [] | |
self.exc = None | |
self.success = False | |
def shout(self): | |
listeners = self.listeners[self.room] | |
if not listeners: | |
self.exc = UserWarning( | |
"Nobody is listening to room: {0}".format(self.room)) | |
return | |
for listener in listeners: | |
try: | |
result = listener(*self.args, **self.kwargs) | |
self.results.append(result) | |
except: | |
self.exc = sys.exc_info() | |
return | |
self.success = True | |
@classmethod | |
def add_listener(cls, fn): | |
for room in fn.rooms: | |
cls.listeners[room].add(fn) | |
return cls | |
@classmethod | |
def rem_listener(cls, fn): | |
for room_set in cls.listeners.itervalues(): | |
cls.listeners.discard(fn) | |
return cls | |
@staticmethod | |
def new(name): | |
message = type(name, (Message,), {}) | |
return message | |
class HasEars(object): | |
def __init__(self, *args, **kwargs): | |
members = inspect.getmembers(self.__class__) | |
for name, member in members: | |
if getattr(member, "has_ears", False): | |
method = getattr(self, member.__name__) | |
for msg_type in member.msg_types: | |
msg_type.add_listener(method) | |
super(HasEars, self).__init__(*args, **kwargs) | |
def typecheck_args(args): | |
'''Ensures all objects in sequence are of type Message.''' | |
if isinstance(args, Sequence): | |
for item in args: | |
if not issubclass(item, Message): | |
raise TypeError( | |
"All arguments passed to hears must be" | |
" subclasses of Message") | |
return True | |
raise TypeError( | |
"Wrong argument signature passed to hears decorator..." | |
"Pass a Message subclass or multiple Message subclasses.") | |
def hears(*args, **kwargs): | |
'''Wrap a function or Node method to hear Messages. Pass Node names to the | |
rooms keyword to limit the method to hear only Messages from certain | |
Node objects.''' | |
def wrapper(fn): | |
typecheck_args(args) # Make sure all our args are Message Subclasses | |
fn.has_ears = True | |
fn.msg_types = args | |
fn.rooms = kwargs.get("inside", (ROOM_DEFL,)) | |
if isinstance(fn.rooms, basestring): | |
fn.rooms = (fn.rooms,) | |
argspec = inspect.getargspec(fn) | |
if argspec.args and argspec.args[0] == "self": | |
return fn | |
for msg_type in fn.msg_types: | |
msg_type.add_listener(fn) | |
return wrapper | |
def shout(msg_type, *args, **kwargs): | |
'''A convenience method for shouting Message instances.''' | |
msg = msg_type(*args, **kwargs) | |
msg.shout() | |
return msg |
from shout import Message, HasEars, hears, shout, typecheck_args | |
from nose.tools import * | |
class GetClasses(Message): | |
pass | |
class GetException(Message): | |
pass | |
class Greet(Message): | |
pass | |
class SendArgsKwargs(Message): | |
pass | |
class A(HasEars): | |
@hears(Greet) | |
def a_class_method(self): | |
return "Hi from a!" | |
@hears(GetClasses) | |
def give_class(self): | |
return self.__class__ | |
class B(HasEars): | |
@hears(Greet, inside=("B", "C")) | |
def b_class_method(self): | |
return "Hi from b!" | |
@hears(GetClasses) | |
def give_class(self): | |
return self.__class__ | |
@hears(Greet, inside=("Unbound",)) | |
def module_level_fn(): | |
return "Hi from module_level_fn!" | |
@hears(SendArgsKwargs) | |
def module_level_fn_args_kwargs(*args, **kwargs): | |
return args, kwargs | |
@hears(GetException) | |
def module_level_fn_raises_exc(*args, **kwargs): | |
raise AttributeError("Bad error!") | |
class Test_Shout(object): | |
@classmethod | |
def setup_class(cls): | |
a = A() | |
b = B() | |
def test_shout(self): | |
msg = shout(Greet) | |
assert msg.results == ['Hi from a!'] | |
def test_room(self): | |
msg = shout(Greet, to="Unbound") | |
assert msg.results == ["Hi from module_level_fn!"] | |
def test_rooms(self): | |
msg_b = shout(Greet, to="B") | |
msg_c = shout(Greet, to="C") | |
assert msg_b.results == ["Hi from b!"] | |
assert msg_c.results == ["Hi from b!"] | |
def test_exc(self): | |
msg_e = shout(GetException) | |
assert isinstance(msg_e.exc[1], AttributeError) | |
def test_results(self): | |
msg = shout(GetClasses) | |
assert A in msg.results | |
assert B in msg.results | |
def test_args_kwargs(self): | |
msg = shout(SendArgsKwargs, "oh yes", kwarg="right") | |
assert (("oh yes", ), {"kwarg": "right"}) in msg.results | |
@raises(TypeError) | |
def test_typecheck_args(self): | |
args = (Greet, GetClasses, SendArgsKwargs) | |
assert typecheck_args(args) | |
args = ("A", Greet) | |
typecheck_args(args) | |
args = "A" | |
typecheck_args(args) |