Skip to content

Instantly share code, notes, and snippets.

@lglista
Created March 15, 2022 16:17
Show Gist options
  • Save lglista/d3d6370061ffc1dcccc2a00493ba7480 to your computer and use it in GitHub Desktop.
Save lglista/d3d6370061ffc1dcccc2a00493ba7480 to your computer and use it in GitHub Desktop.
python3 Observer Pattern (Publisher/Subscriber pattern)
from pubsub import PubSub
from test_classes import Test1, Test2, print_string, add_number
def main():
bus = PubSub()
test1 = Test1("hello")
test2 = Test2(2)
bus.subscribe(print_string, Test1)
bus.subscribe(add_number, Test2)
bus.publish(test1)
bus.publish(test2)
if __name__ == '__main__':
main()
class PubSub:
def __init__(self):
self.subscribers = {}
def subscribe(self, function, message):
if message not in self.subscribers:
self.subscribers[message] = []
self.subscribers[message].append(function)
def publish(self, message):
for function in self.subscribers[type(message)]:
function(message)
class Test1:
def __init__(self, string):
self.message = string
class Test2:
def __init__(self, int):
self.int = int
def print_string(message):
print(message.message)
def add_number(message):
print(1 + message.int)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment