Skip to content

Instantly share code, notes, and snippets.

@chkoar
Created December 7, 2020 14:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chkoar/c70020845f16fc5c1f2547714bbe19d9 to your computer and use it in GitHub Desktop.
Save chkoar/c70020845f16fc5c1f2547714bbe19d9 to your computer and use it in GitHub Desktop.
A lightweighted message bus based on Reactive Extensions for Python (rxpy)
import rx
from rx import operators as ops
from rx.subject import Subject
class MessageBus:
def __init__(self, scheduler=None):
self.subject = Subject()
self.scheduler = scheduler
def dispatch(self, message):
self.subject.on_next(message)
def register(self, message_class, handler):
self.subject.pipe(
ops.filter(lambda message: isinstance(message, message_class)),
ops.flat_map(lambda message: rx.just(message, scheduler=self.scheduler)),
).subscribe(handler)
if __name__ == "__main__":
import time
from dataclasses import dataclass
@dataclass
class SomethingHappened:
data: str
@dataclass
class SomethingOtherHappened:
data: str
def handler(message):
print(message)
bus = MessageBus()
bus.register(SomethingHappened, handler)
bus.register(SomethingOtherHappened, handler)
bus.dispatch(SomethingHappened("Something Happened"))
bus.dispatch(SomethingOtherHappened("Something Other Happened"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment