Last active
April 30, 2024 21:08
-
-
Save Garciat/1484d16ae455ca791147a1eab0836f9a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import ABC | |
from abc import abstractmethod | |
from abc import abstractproperty | |
from dataclasses import dataclass, field | |
from typing import Any | |
from typing import AsyncGenerator | |
from typing import Awaitable | |
from typing import Callable | |
from typing import Generator | |
from typing import Generic | |
from typing import Iterator | |
from typing import List | |
from typing import NoReturn | |
from typing import Optional | |
from typing import Tuple | |
from typing import TypeVar | |
from typing import Union | |
from typing import cast | |
_T = TypeVar('_T') | |
### | |
class _PromiseSentinel: pass | |
_PROMISE_SENTINEL = _PromiseSentinel() | |
@dataclass | |
class Promise(Generic[_T]): | |
_result: Union[_PromiseSentinel, _T] = _PROMISE_SENTINEL | |
def complete(self, value: _T) -> None: | |
if isinstance(self._result, _PromiseSentinel): | |
self._result = value | |
else: | |
raise Exception('Promise already set') | |
@property | |
def value(self) -> _T: | |
if isinstance(self._result, _PromiseSentinel): | |
raise Exception('Promise is not set') | |
else: | |
return self._result | |
def impossible() -> NoReturn: raise Exception('impossible') | |
### | |
# Effects implementation | |
class Answer(Generic[_T], ABC): | |
@abstractproperty | |
def value(self) -> _T: ... | |
@dataclass(frozen = True) | |
class _AnswerImpl(Generic[_T], Answer[_T]): | |
_value: _T | |
@property | |
def value(self) -> _T: | |
return self._value | |
class Effect(Generic[_T], ABC): | |
def answer(self, value: _T) -> Answer[_T]: | |
return _AnswerImpl(value) | |
@dataclass(frozen = True) | |
class EffectFuture(Generic[_T], Awaitable[_T]): | |
effect: Effect[_T] | |
promise: Promise[_T] = field(default_factory=Promise) | |
def __await__(self) -> Generator[Any, None, _T]: | |
yield self | |
return self.promise.value | |
class EffectHandler(ABC): | |
@abstractmethod | |
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]: ... | |
EffectAction = Union[EffectFuture[Any]] | |
def run_effects(handler: EffectHandler, awaitable: Awaitable[_T]) -> _T: | |
gen = cast(Generator[EffectAction, None, _T], awaitable.__await__()) | |
try: | |
while True: | |
action = gen.send(None) | |
if isinstance(action, EffectFuture): | |
answer = run_effects(handler, handler.handle(action.effect)) | |
if isinstance(answer, Answer): | |
action.promise.complete(answer.value) | |
else: | |
raise Exception('Unhandled effect: {!r}'.format(action.effect)) | |
else: | |
raise Exception('Unexpected action: {!r}'.format(action)) | |
except StopIteration as stop: | |
return cast(_T, stop.value) | |
finally: | |
gen.close() | |
class HandlerStack(EffectHandler): | |
_handlers: Tuple[EffectHandler, ...] | |
def __init__(self, *handlers: EffectHandler): | |
self._handlers = handlers | |
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]: | |
for handler in self._handlers: | |
answer = await handler.handle(effect) | |
if answer is not None: | |
return answer | |
return None | |
### | |
# Domain & Effects | |
@dataclass(frozen = True) | |
class Tweet: ... | |
@dataclass(frozen=True) | |
class FacebookUser: ... | |
@dataclass(frozen = True) | |
class GetTweets(Effect[List[Tweet]]): | |
user_id: str | |
@dataclass(frozen = True) | |
class GetFollowers(Effect[List[str]]): | |
user_id: str | |
@dataclass(frozen = True) | |
class GetFriends(Effect[List[FacebookUser]]): | |
user_id: str | |
### | |
# Helpers that map ValueEffects into async functions | |
async def get_tweets(user_id: str) -> List[Tweet]: | |
return await EffectFuture(GetTweets(user_id=user_id)) | |
async def get_followers(user_id: str) -> List[str]: | |
return await EffectFuture(GetFollowers(user_id=user_id)) | |
async def get_friends(user_id: str) -> List[FacebookUser]: | |
return await EffectFuture(GetFriends(user_id=user_id)) | |
### | |
# Lower-level effects | |
@dataclass(frozen=True) | |
class HttpRequest: | |
path: str | |
@dataclass(frozen=True) | |
class HttpResponse: ... | |
@dataclass(frozen=True) | |
class SendHttp(Effect[HttpResponse]): | |
request: HttpRequest | |
async def send_http(request: HttpRequest) -> HttpResponse: | |
return await EffectFuture(SendHttp(request=request)) | |
### | |
# Handler implementations | |
class StubTwitterHandler(EffectHandler): | |
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]: | |
if isinstance(effect, GetTweets): | |
print(await send_http(HttpRequest(path='/tweets/{}'.format(effect.user_id)))) | |
return effect.answer([Tweet(), Tweet()]) | |
elif isinstance(effect, GetFollowers): | |
return effect.answer(['follow1', 'follow2']) | |
else: | |
return None | |
class StubFacebookHandler(EffectHandler): | |
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]: | |
if isinstance(effect, GetFriends): | |
return effect.answer([FacebookUser()]) | |
else: | |
return None | |
class StubHttpHandler(EffectHandler): | |
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]: | |
if isinstance(effect, SendHttp): | |
return effect.answer(HttpResponse()) | |
else: | |
return None | |
class EffectLogger(EffectHandler): | |
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]: | |
print('EffectLogger:', repr(effect)) | |
return None | |
### | |
# Example program | |
async def print_tweets(user_id: str) -> int: | |
tweets = await get_tweets(user_id) | |
for t in tweets: | |
print(t) | |
print(await get_followers('someone_else')) | |
print(await get_friends('yet_another_one')) | |
return len(tweets) | |
def _main() -> None: | |
handler = HandlerStack( | |
EffectLogger(), | |
StubTwitterHandler(), | |
StubFacebookHandler(), | |
StubHttpHandler(), | |
) | |
print('final output:', run_effects(handler, print_tweets('garciat'))) | |
if __name__ == "__main__": | |
_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment