Skip to content

Instantly share code, notes, and snippets.

@Garciat
Last active April 30, 2024 21:08
Show Gist options
  • Save Garciat/1484d16ae455ca791147a1eab0836f9a to your computer and use it in GitHub Desktop.
Save Garciat/1484d16ae455ca791147a1eab0836f9a to your computer and use it in GitHub Desktop.
from abc import ABC
from abc import abstractmethod
from abc import abstractproperty
from dataclasses import dataclass, field
from typing import Any
from typing import AsyncGenerator
from typing import Awaitable
from typing import Callable
from typing import Generator
from typing import Generic
from typing import Iterator
from typing import List
from typing import NoReturn
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing import cast
_T = TypeVar('_T')
###
class _PromiseSentinel: pass
_PROMISE_SENTINEL = _PromiseSentinel()
@dataclass
class Promise(Generic[_T]):
_result: Union[_PromiseSentinel, _T] = _PROMISE_SENTINEL
def complete(self, value: _T) -> None:
if isinstance(self._result, _PromiseSentinel):
self._result = value
else:
raise Exception('Promise already set')
@property
def value(self) -> _T:
if isinstance(self._result, _PromiseSentinel):
raise Exception('Promise is not set')
else:
return self._result
def impossible() -> NoReturn: raise Exception('impossible')
###
# Effects implementation
class Answer(Generic[_T], ABC):
@abstractproperty
def value(self) -> _T: ...
@dataclass(frozen = True)
class _AnswerImpl(Generic[_T], Answer[_T]):
_value: _T
@property
def value(self) -> _T:
return self._value
class Effect(Generic[_T], ABC):
def answer(self, value: _T) -> Answer[_T]:
return _AnswerImpl(value)
@dataclass(frozen = True)
class EffectFuture(Generic[_T], Awaitable[_T]):
effect: Effect[_T]
promise: Promise[_T] = field(default_factory=Promise)
def __await__(self) -> Generator[Any, None, _T]:
yield self
return self.promise.value
class EffectHandler(ABC):
@abstractmethod
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]: ...
EffectAction = Union[EffectFuture[Any]]
def run_effects(handler: EffectHandler, awaitable: Awaitable[_T]) -> _T:
gen = cast(Generator[EffectAction, None, _T], awaitable.__await__())
try:
while True:
action = gen.send(None)
if isinstance(action, EffectFuture):
answer = run_effects(handler, handler.handle(action.effect))
if isinstance(answer, Answer):
action.promise.complete(answer.value)
else:
raise Exception('Unhandled effect: {!r}'.format(action.effect))
else:
raise Exception('Unexpected action: {!r}'.format(action))
except StopIteration as stop:
return cast(_T, stop.value)
finally:
gen.close()
class HandlerStack(EffectHandler):
_handlers: Tuple[EffectHandler, ...]
def __init__(self, *handlers: EffectHandler):
self._handlers = handlers
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
for handler in self._handlers:
answer = await handler.handle(effect)
if answer is not None:
return answer
return None
###
# Domain & Effects
@dataclass(frozen = True)
class Tweet: ...
@dataclass(frozen=True)
class FacebookUser: ...
@dataclass(frozen = True)
class GetTweets(Effect[List[Tweet]]):
user_id: str
@dataclass(frozen = True)
class GetFollowers(Effect[List[str]]):
user_id: str
@dataclass(frozen = True)
class GetFriends(Effect[List[FacebookUser]]):
user_id: str
###
# Helpers that map ValueEffects into async functions
async def get_tweets(user_id: str) -> List[Tweet]:
return await EffectFuture(GetTweets(user_id=user_id))
async def get_followers(user_id: str) -> List[str]:
return await EffectFuture(GetFollowers(user_id=user_id))
async def get_friends(user_id: str) -> List[FacebookUser]:
return await EffectFuture(GetFriends(user_id=user_id))
###
# Lower-level effects
@dataclass(frozen=True)
class HttpRequest:
path: str
@dataclass(frozen=True)
class HttpResponse: ...
@dataclass(frozen=True)
class SendHttp(Effect[HttpResponse]):
request: HttpRequest
async def send_http(request: HttpRequest) -> HttpResponse:
return await EffectFuture(SendHttp(request=request))
###
# Handler implementations
class StubTwitterHandler(EffectHandler):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
if isinstance(effect, GetTweets):
print(await send_http(HttpRequest(path='/tweets/{}'.format(effect.user_id))))
return effect.answer([Tweet(), Tweet()])
elif isinstance(effect, GetFollowers):
return effect.answer(['follow1', 'follow2'])
else:
return None
class StubFacebookHandler(EffectHandler):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
if isinstance(effect, GetFriends):
return effect.answer([FacebookUser()])
else:
return None
class StubHttpHandler(EffectHandler):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
if isinstance(effect, SendHttp):
return effect.answer(HttpResponse())
else:
return None
class EffectLogger(EffectHandler):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
print('EffectLogger:', repr(effect))
return None
###
# Example program
async def print_tweets(user_id: str) -> int:
tweets = await get_tweets(user_id)
for t in tweets:
print(t)
print(await get_followers('someone_else'))
print(await get_friends('yet_another_one'))
return len(tweets)
def _main() -> None:
handler = HandlerStack(
EffectLogger(),
StubTwitterHandler(),
StubFacebookHandler(),
StubHttpHandler(),
)
print('final output:', run_effects(handler, print_tweets('garciat')))
if __name__ == "__main__":
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment