Skip to content

Instantly share code, notes, and snippets.

@funketh
Last active March 19, 2019 20:58
Show Gist options
  • Save funketh/7efc42456103ae9e2c67ea35916732ac to your computer and use it in GitHub Desktop.
Save funketh/7efc42456103ae9e2c67ea35916732ac to your computer and use it in GitHub Desktop.
Basic subscriber/ publisher pattern implementation
import logging
from collections import defaultdict
class TopicManager:
def __init__(self):
self.subscribers = defaultdict(set)
def subscribe(self, topic, callback):
"""Subscribes a callback function to a topic"""
self.subscribers[topic].add(callback)
logging.debug(f'Subscribed {callback} to {topic}')
def notify(self, topic, *data, **kwdata):
"""Calls every callback for a specified topic while passing data"""
for sub in self.subscribers[topic]:
try:
sub(*data, **kwdata)
except:
logging.exception(f'Callback "{sub}", subscribed to '
f'"{topic}", raised an exception when called '
f'with args:{data}, kwargs:{kwdata}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment