Last active
January 29, 2025 17:12
EventBus class
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import importlib | |
import inspect | |
import os | |
from functools import wraps | |
from typing import List, Callable, Dict, Type, ClassVar | |
class EventBus: | |
listeners: Dict[str, List[Callable]] | |
def __init__(self): | |
self.listeners = {} | |
def register(self, event: str | Type['Event']): | |
def decorator(func: Callable): | |
from .events import Event | |
name = event.__name__ if isinstance(event, type) and issubclass(event, Event) else event | |
self.listeners.setdefault(name, []).append(func) | |
setattr(func, '_is_registered_listener', True) | |
return func | |
return decorator | |
def dispatch(self, event: str | Type['Event'], **kwargs): | |
from deal_finder.events import Event | |
event_data = {"event": event} | |
handlers = self.get_handlers(event.__class__.__name__ if isinstance(event, Event) else event) | |
for listener in handlers: | |
print(f"Dispatching event {event} to {listener}") | |
listener(**event_data, **kwargs) | |
def get_handlers(self, event: str | Type['Event']): | |
from deal_finder.events import Event | |
if isinstance(event, type) and issubclass(event, Event): | |
name = event.__class__.__name__ | |
elif isinstance(event, str): | |
name = event | |
return self.listeners.get(name, []) | |
@staticmethod | |
def scan_for_listeners(): | |
global bus | |
# Scan the current directory for Python files | |
for root, _, files in os.walk(os.getcwd()): | |
for file in files: | |
try: | |
if file.endswith('handlers.py'): | |
module_name = os.path.splitext(file)[0] | |
module = importlib.import_module(module_name) | |
for name, obj in inspect.getmembers(module): | |
if inspect.isfunction(obj) and hasattr(obj, '_is_registered_listener'): | |
bus.register(obj) | |
except Exception as e: | |
... | |
bus = EventBus() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass, field | |
from uuid import UUID, uuid4 | |
@dataclass(frozen=True) | |
class Event: | |
... | |
@dataclass(frozen=True) | |
class DealFound(Event): | |
deal_url: str | |
user_id: UUID | |
@dataclass(frozen=True) | |
class UserRegistered(Event): | |
user_id: UUID | |
email: str | |
interest_tags: list[str] = field(default_factory=list) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from deal_finder import register_listener | |
from deal_finder.events import UserRegistered | |
@register_listener(UserRegistered) | |
def log_user_registered(event: UserRegistered, logger, **kwargs): | |
logger.info(f"User {event.user_id} registered") | |
@register_listener(UserRegistered) | |
def send_welcome_email(event: UserRegistered, logger, **kwargs): | |
logger.info(f"Welcome email sent to {event.email}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
from deal_finder.event_bus import event_bus | |
from deal_finder.events import UserRegistered | |
def main(): | |
event = UserRegistered( | |
user_id=uuid.uuid4(), | |
interest_tags=['travel', 'food', 'sports'], | |
email='user@email.com', | |
) | |
event_bus.scan_for_listeners() | |
event_bus.dispatch(event, logger_name='user_service') | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
from unittest import TestCase | |
from deal_finder.event_bus import EventBus | |
from deal_finder.events import Event | |
class TestEventBus(TestCase): | |
def test_registering_of_handler_for_event(self): | |
event_bus = EventBus() | |
@dataclass(frozen=True) | |
class HelloEvent(Event): | |
name: str = "hello" | |
@event_bus.register(HelloEvent) | |
def hello_handler(event: Event): | |
assert event.name == "hello" | |
assert len(event_bus.listeners) == 1 | |
def test_registering_multiple_handlers_for_event(self): | |
event_bus = EventBus() | |
@dataclass(frozen=True) | |
class DummyEvent(Event): | |
... | |
@event_bus.register(DummyEvent) | |
def handler1(*args, **kwargs): | |
print('Handler 1') | |
# | |
@event_bus.register(DummyEvent) | |
def handler2(*args, **kwargs): | |
print('Handler 2') | |
assert len(event_bus.get_handlers(DummyEvent)) == 2 | |
def test_both_handlers_called_for_dummy_event(self): | |
event_bus = EventBus() | |
@dataclass(frozen=True) | |
class DummyEvent(Event): | |
... | |
handler_calls = [] | |
@event_bus.register(DummyEvent) | |
def handler1(*args, **kwargs): | |
handler_calls.append('handler1_called') | |
@event_bus.register(DummyEvent) | |
def handler2(*args, **kwargs): | |
handler_calls.append('handler2_called') | |
event_bus.dispatch(DummyEvent()) | |
assert 'handler1_called' in handler_calls | |
assert 'handler2_called' in handler_calls | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment