Skip to content

Instantly share code, notes, and snippets.

@ndy40
Last active January 29, 2025 17:12
EventBus class
import importlib
import inspect
import os
from functools import wraps
from typing import List, Callable, Dict, Type, ClassVar
class EventBus:
listeners: Dict[str, List[Callable]]
def __init__(self):
self.listeners = {}
def register(self, event: str | Type['Event']):
def decorator(func: Callable):
from .events import Event
name = event.__name__ if isinstance(event, type) and issubclass(event, Event) else event
self.listeners.setdefault(name, []).append(func)
setattr(func, '_is_registered_listener', True)
return func
return decorator
def dispatch(self, event: str | Type['Event'], **kwargs):
from deal_finder.events import Event
event_data = {"event": event}
handlers = self.get_handlers(event.__class__.__name__ if isinstance(event, Event) else event)
for listener in handlers:
print(f"Dispatching event {event} to {listener}")
listener(**event_data, **kwargs)
def get_handlers(self, event: str | Type['Event']):
from deal_finder.events import Event
if isinstance(event, type) and issubclass(event, Event):
name = event.__class__.__name__
elif isinstance(event, str):
name = event
return self.listeners.get(name, [])
@staticmethod
def scan_for_listeners():
global bus
# Scan the current directory for Python files
for root, _, files in os.walk(os.getcwd()):
for file in files:
try:
if file.endswith('handlers.py'):
module_name = os.path.splitext(file)[0]
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module):
if inspect.isfunction(obj) and hasattr(obj, '_is_registered_listener'):
bus.register(obj)
except Exception as e:
...
bus = EventBus()
from dataclasses import dataclass, field
from uuid import UUID, uuid4
@dataclass(frozen=True)
class Event:
...
@dataclass(frozen=True)
class DealFound(Event):
deal_url: str
user_id: UUID
@dataclass(frozen=True)
class UserRegistered(Event):
user_id: UUID
email: str
interest_tags: list[str] = field(default_factory=list)
from deal_finder import register_listener
from deal_finder.events import UserRegistered
@register_listener(UserRegistered)
def log_user_registered(event: UserRegistered, logger, **kwargs):
logger.info(f"User {event.user_id} registered")
@register_listener(UserRegistered)
def send_welcome_email(event: UserRegistered, logger, **kwargs):
logger.info(f"Welcome email sent to {event.email}")
import uuid
from deal_finder.event_bus import event_bus
from deal_finder.events import UserRegistered
def main():
event = UserRegistered(
user_id=uuid.uuid4(),
interest_tags=['travel', 'food', 'sports'],
email='user@email.com',
)
event_bus.scan_for_listeners()
event_bus.dispatch(event, logger_name='user_service')
if __name__ == '__main__':
main()
from dataclasses import dataclass
from unittest import TestCase
from deal_finder.event_bus import EventBus
from deal_finder.events import Event
class TestEventBus(TestCase):
def test_registering_of_handler_for_event(self):
event_bus = EventBus()
@dataclass(frozen=True)
class HelloEvent(Event):
name: str = "hello"
@event_bus.register(HelloEvent)
def hello_handler(event: Event):
assert event.name == "hello"
assert len(event_bus.listeners) == 1
def test_registering_multiple_handlers_for_event(self):
event_bus = EventBus()
@dataclass(frozen=True)
class DummyEvent(Event):
...
@event_bus.register(DummyEvent)
def handler1(*args, **kwargs):
print('Handler 1')
#
@event_bus.register(DummyEvent)
def handler2(*args, **kwargs):
print('Handler 2')
assert len(event_bus.get_handlers(DummyEvent)) == 2
def test_both_handlers_called_for_dummy_event(self):
event_bus = EventBus()
@dataclass(frozen=True)
class DummyEvent(Event):
...
handler_calls = []
@event_bus.register(DummyEvent)
def handler1(*args, **kwargs):
handler_calls.append('handler1_called')
@event_bus.register(DummyEvent)
def handler2(*args, **kwargs):
handler_calls.append('handler2_called')
event_bus.dispatch(DummyEvent())
assert 'handler1_called' in handler_calls
assert 'handler2_called' in handler_calls
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment