Created
December 7, 2020 14:57
-
-
Save chkoar/c70020845f16fc5c1f2547714bbe19d9 to your computer and use it in GitHub Desktop.
A lightweighted message bus based on Reactive Extensions for Python (rxpy)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx | |
from rx import operators as ops | |
from rx.subject import Subject | |
class MessageBus: | |
def __init__(self, scheduler=None): | |
self.subject = Subject() | |
self.scheduler = scheduler | |
def dispatch(self, message): | |
self.subject.on_next(message) | |
def register(self, message_class, handler): | |
self.subject.pipe( | |
ops.filter(lambda message: isinstance(message, message_class)), | |
ops.flat_map(lambda message: rx.just(message, scheduler=self.scheduler)), | |
).subscribe(handler) | |
if __name__ == "__main__": | |
import time | |
from dataclasses import dataclass | |
@dataclass | |
class SomethingHappened: | |
data: str | |
@dataclass | |
class SomethingOtherHappened: | |
data: str | |
def handler(message): | |
print(message) | |
bus = MessageBus() | |
bus.register(SomethingHappened, handler) | |
bus.register(SomethingOtherHappened, handler) | |
bus.dispatch(SomethingHappened("Something Happened")) | |
bus.dispatch(SomethingOtherHappened("Something Other Happened")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment