Skip to content

Instantly share code, notes, and snippets.

@onjin
Created May 6, 2014 13:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save onjin/0f13341405c1323660df to your computer and use it in GitHub Desktop.
Save onjin/0f13341405c1323660df to your computer and use it in GitHub Desktop.
internal pubsub
from collections import defaultdict
from contextlib import contextmanager
class Exchange:
def __init__(self):
self._subscribers = set()
def attach(self, task):
self._subscribers.add(task)
def detach(self, task):
self._subscribers.remove(task)
def send(self, msg):
for subscriber in self._subscribers:
subscriber.send(msg)
@contextmanager
def subscribe(self, *tasks):
for task in tasks:
self.attach(task)
try:
yield
finally:
for task in tasks:
self.detach(task)
_exchanges = defaultdict(Exchange)
def get_exchange(name):
return _exchanges[name]
class Task:
def send(self, msg):
pass
class DisplayMessage:
def __init__(self):
self.count = 0
def send(self, msg):
self.count += 1
print("msg[{}]: {!r}".format(self.count, msg))
debug = DisplayMessage()
task_a = Task()
task_b = Task()
exc = get_exchange('name')
with exc.subscribe(task_a, task_b, debug):
exc.send('msg1')
exc.send('msg2')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment