Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python NotificationsManager and observer
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class observes(object):
def __init__(self, name):
self.name = name
def __call__(self, f):
def wrapped_f(*args, **kwargs):
return f(*args, **kwargs)
NotificationsManager().register(self.name, wrapped_f)
return wrapped_f
class NotificationsManager(object):
__metaclass__ = Singleton
def __init__(self):
self.observers = dict()
def register(self, name, observer):
self.__verify_initialized(name)
if observer not in self.observers[name]:
self.observers[name].append(observer)
def unregister(self, name, observer):
self.__verify_initialized(name)
if observer in self.observers[name]:
self.observers[name].remove(observer)
def unregister_all(self):
self.observers.clear()
def notify(self, name, *args, **kwargs):
self.__verify_initialized(name)
for observer in self.observers[name]:
observer(*args, **kwargs)
def __verify_initialized(self, name):
if not self.observers.get(name):
self.observers[name] = []
if __name__ == "__main__":
class Foo(object):
@staticmethod
@observes("my_event")
def onMyEvent():
print("Foo.onMyEvent called!")
NotificationsManager().notify("my_event")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.