# tested on Python3.9 with just injector installed (pip install injector==0.18.4) | |
from dataclasses import dataclass | |
from typing import TypeVar, Generic | |
from injector import Injector, Module, provider | |
TCommand = TypeVar("TCommand") | |
class Handler(Generic[TCommand]): | |
def __call__(self, command: TCommand) -> None: | |
raise NotImplementedError | |
@dataclass(frozen=True) | |
class Enrol: | |
student_id: int | |
course_id: int | |
class EnrolHandler(Handler[Enrol]): | |
def __call__(self, command: Enrol) -> None: | |
print(f"command: {command}") | |
class Enrolment(Module): | |
@provider | |
def enrol_handler(self) -> Handler[Enrol]: | |
return EnrolHandler() | |
class CommandBus: | |
def __init__(self, container: Injector) -> None: | |
self._container = container | |
def handle(self, command: TCommand) -> None: | |
command_cls: Type[TCommand] = type(command) | |
handler = self._container.get(Handler[command_cls]) | |
handler(command) | |
container = Injector([Enrolment()], auto_bind=False) | |
command_bus = CommandBus(container) | |
command_bus.handle(Enrol(student_id=123000, course_id=666)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment