Skip to content

Instantly share code, notes, and snippets.

@dankrause
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dankrause/9488553 to your computer and use it in GitHub Desktop.
Save dankrause/9488553 to your computer and use it in GitHub Desktop.
Toy pub/sub implementation in python
from functools import partial
from fnmatch import fnmatchcase
from collections import defaultdict
class Event(dict):
handlers = defaultdict(list)
@classmethod
def subscribe(cls, *args):
if not callable(args[-1]):
return partial(cls.subscribe, *args)
for pattern in args[:-1]:
cls.handlers[pattern].append(args[-1])
return args[-1]
def __init__(self, event_type, *args, **kwargs):
self.type = event_type
self._published = False
super(Event, self).__init__(*args, **kwargs)
def publish(self, repeat=False):
if repeat or not self._published:
self._published = True
matches = filter(partial(fnmatchcase, self.type), self.handlers.keys())
callbacks = [i for s in [self.handlers[m] for m in matches] for i in s]
return {c.__name__: c(self) for c in set(callbacks)}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment