Skip to content

Instantly share code, notes, and snippets.

@yreynhout
Last active November 10, 2023 14:34
Show Gist options
  • Save yreynhout/e19a58412961cdc2a05042d2fee039a9 to your computer and use it in GitHub Desktop.
Save yreynhout/e19a58412961cdc2a05042d2fee039a9 to your computer and use it in GitHub Desktop.
Decider Pattern in Python
# You need the following packages: dataclasses-json, esdbclient
import uuid
import json
from esdbclient import EventStoreDBClient, NewEvent, StreamState, RecordedEvent
from esdbclient.exceptions import NotFound
from typing import Callable, Generic, List, TypeVar
from dataclasses import dataclass
from dataclasses_json import dataclass_json
Command = TypeVar("Command")
State = TypeVar("State")
Event = TypeVar("Event")
class Decider(Generic[Command, State, Event]):
def __init__(self, decide: Callable[[Command, State], List[Event]], evolve: Callable[[State, Event], State],
initial_state: State):
self.decide = decide
self.evolve = evolve
self.initial_state = initial_state
class Encoder(Generic[Event]):
def encode(self, event: Event) -> NewEvent:
pass
def decode(self, event: RecordedEvent) -> Event:
pass
class CommandHandler(Generic[Command, State, Event]):
def __init__(self,
decider: Decider[Command, State, Event],
stream_resolver: Callable[[Command], str],
encoder: Encoder[Event]):
self.decider = decider
self.stream_resolver = stream_resolver
self.encoder = encoder
def handle(self, esdbclient: EventStoreDBClient, command: Command):
stream = self.stream_resolver(command)
state = self.decider.initial_state
version: int | StreamState = StreamState.NO_STREAM
try:
for event in esdbclient.read_stream(stream):
state = self.decider.evolve(state, self.encoder.decode(event))
version = event.stream_position
except NotFound:
pass
events = []
for event in self.decider.decide(command, state):
events.append(self.encoder.encode(event))
return esdbclient.append_to_stream(
stream_name=stream,
current_version=version,
events=events)
@dataclass_json
@dataclass
class RegisterUserCommand:
id: str
email: str
password: str
@dataclass_json
@dataclass
class ResetUserPasswordCommand:
id: str
password: str
UserCommand = RegisterUserCommand | ResetUserPasswordCommand
@dataclass_json
@dataclass
class UserRegisteredEvent:
id: str
email: str
password: str
@dataclass_json
@dataclass
class UserPasswordResetEvent:
id: str
old_password: str
new_password: str
UserEvent = UserRegisteredEvent | UserPasswordResetEvent
@dataclass_json
@dataclass
class UnregisteredUserState:
pass
@dataclass_json
@dataclass
class RegisteredUserState:
id: str
email: str
password: str
UserState = UnregisteredUserState | RegisteredUserState
class UserEventEncoder(Encoder[UserEvent]):
def encode(self, event: UserEvent) -> NewEvent:
match event:
case UserRegisteredEvent() as registered:
return NewEvent(
type="user_registered_event",
data=registered.to_json().encode("utf-8")
)
case UserPasswordResetEvent() as password_reset:
return NewEvent(
type="user_password_reset_event",
data=password_reset.to_json().encode("utf-8")
)
def decode(self, event: RecordedEvent) -> UserEvent:
match event.type:
case "user_registered_event":
return UserRegisteredEvent.from_json(event.data.decode("utf-8"))
case "user_password_reset_event":
return UserPasswordResetEvent.from_json(event.data.decode("utf-8"))
def user_decide(command: UserCommand, state: UserState) -> List[UserEvent]:
match (state, command):
case (UnregisteredUserState(), RegisterUserCommand(id=identifier, email=email, password=password)):
return [UserRegisteredEvent(identifier, email, password)]
case (RegisteredUserState(id=identifier, email=_, password=old_password)), \
ResetUserPasswordCommand(_, password=new_password):
return [UserPasswordResetEvent(identifier, old_password, new_password)]
return []
def user_evolve(state: UserState, event: UserEvent) -> UserState:
match (state, event):
case (UnregisteredUserState(), UserRegisteredEvent(id=identifier, email=email, password=password)):
return RegisteredUserState(identifier, email, password)
case (RegisteredUserState(id=identifier, email=email, password=_), UserPasswordResetEvent(_, _, new_password=password)):
return RegisteredUserState(identifier, email, password)
return state
user_decider = Decider[UserCommand, UserState, UserEvent](
decide=user_decide,
evolve=user_evolve,
initial_state=UnregisteredUserState()
)
handler = CommandHandler[UserCommand, UserState, UserEvent](
decider=user_decider,
stream_resolver=lambda command: f"user-{command.id}",
encoder=UserEventEncoder()
)
client = EventStoreDBClient(
uri="esdb://localhost:2113?Tls=false"
)
handler.handle(client, RegisterUserCommand("1", "john@doe.com", "my-p@ssw0rd"))
handler.handle(client, ResetUserPasswordCommand("1", "my-new-p@ssw0rd"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment