Skip to content

Instantly share code, notes, and snippets.

@skrater
Created June 12, 2019 13:51
Show Gist options
  • Save skrater/b1e9d0c6157570bf4d8c00c4f3f00b7d to your computer and use it in GitHub Desktop.
Save skrater/b1e9d0c6157570bf4d8c00c4f3f00b7d to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
class Topic(object):
_receivers = {}
def __init__(self, topic_name):
self._topic_name = topic_name
def subscribe(self, receiver):
if not callable(receiver):
raise ValueError('Receiver must be callable.')
receivers = self._receivers.get(self._topic_name, [])
receivers.append(receiver)
self._receivers[self._topic_name] = receivers
def publish(self, *args, **kwargs):
receivers = self._receivers.get(self._topic_name, [])
for receiver in receivers:
receiver(*args, **kwargs)
def unsubscribe(self, receiver):
receivers = self._receivers.get(self._topic_name, [])
receivers.remove(receiver)
def empty(self):
if self._topic_name in self._receivers:
del self._receivers[self._topic_name]
def subscribe(topic_name):
def wrap(receiver):
Topic(topic_name).subscribe(receiver)
return receiver
return wrap
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment