Skip to content

Instantly share code, notes, and snippets.

Last active December 18, 2022 02:28
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save Garciat/1484d16ae455ca791147a1eab0836f9a to your computer and use it in GitHub Desktop.
from abc import ABC
from abc import abstractmethod
from abc import abstractproperty
from dataclasses import dataclass, field
from typing import Any
from typing import AsyncGenerator
from typing import Awaitable
from typing import Callable
from typing import Generator
from typing import Generic
from typing import Iterator
from typing import List
from typing import NoReturn
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing import cast
_T = TypeVar('_T')
class _PromiseSentinel: pass
_PROMISE_SENTINEL = _PromiseSentinel()
class Promise(Generic[_T]):
_result: Union[_PromiseSentinel, _T] = _PROMISE_SENTINEL
def complete(self, value: _T) -> None:
if isinstance(self._result, _PromiseSentinel):
self._result = value
raise Exception('Promise already set')
def value(self) -> _T:
if isinstance(self._result, _PromiseSentinel):
raise Exception('Promise is not set')
return self._result
def impossible() -> NoReturn: raise Exception('impossible')
# Effects implementation
class Answer(Generic[_T], ABC):
def value(self) -> _T: ...
@dataclass(frozen = True)
class _AnswerImpl(Generic[_T], Answer[_T]):
_value: _T
def value(self) -> _T:
return self._value
class Effect(Generic[_T], ABC):
def answer(self, value: _T) -> Answer[_T]:
return _AnswerImpl(value)
@dataclass(frozen = True)
class EffectFuture(Generic[_T], Awaitable[_T]):
effect: Effect[_T]
promise: Promise[_T] = field(default_factory=Promise)
def __await__(self) -> Generator[Any, None, _T]:
yield self
return self.promise.value
class EffectHandler(ABC):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]: ...
EffectAction = Union[EffectFuture[Any]]
def run_effects(handler: EffectHandler, awaitable: Awaitable[_T]) -> _T:
gen = cast(Generator[EffectAction, None, _T], awaitable.__await__())
while True:
action = gen.send(None)
if isinstance(action, EffectFuture):
answer = run_effects(handler, handler.handle(action.effect))
if isinstance(answer, Answer):
raise Exception('Unhandled effect: {!r}'.format(action.effect))
raise Exception('Unexpected action: {!r}'.format(action))
except StopIteration as stop:
return cast(_T, stop.value)
class HandlerStack(EffectHandler):
_handlers: Tuple[EffectHandler, ...]
def __init__(self, *handlers: EffectHandler):
self._handlers = handlers
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
for handler in self._handlers:
answer = await handler.handle(effect)
if answer is not None:
return answer
return None
# Domain & Effects
@dataclass(frozen = True)
class Tweet: ...
class FacebookUser: ...
@dataclass(frozen = True)
class GetTweets(Effect[List[Tweet]]):
user_id: str
@dataclass(frozen = True)
class GetFollowers(Effect[List[str]]):
user_id: str
@dataclass(frozen = True)
class GetFriends(Effect[List[FacebookUser]]):
user_id: str
# Helpers that map ValueEffects into async functions
async def get_tweets(user_id: str) -> List[Tweet]:
return await EffectFuture(GetTweets(user_id=user_id))
async def get_followers(user_id: str) -> List[str]:
return await EffectFuture(GetFollowers(user_id=user_id))
async def get_friends(user_id: str) -> List[FacebookUser]:
return await EffectFuture(GetFriends(user_id=user_id))
# Lower-level effects
class HttpRequest:
path: str
class HttpResponse: ...
class SendHttp(Effect[HttpResponse]):
request: HttpRequest
async def send_http(request: HttpRequest) -> HttpResponse:
return await EffectFuture(SendHttp(request=request))
# Handler implementations
class StubTwitterHandler(EffectHandler):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
if isinstance(effect, GetTweets):
print(await send_http(HttpRequest(path='/tweets/{}'.format(effect.user_id))))
return effect.answer([Tweet(), Tweet()])
elif isinstance(effect, GetFollowers):
return effect.answer(['follow1', 'follow2'])
return None
class StubFacebookHandler(EffectHandler):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
if isinstance(effect, GetFriends):
return effect.answer([FacebookUser()])
return None
class StubHttpHandler(EffectHandler):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
if isinstance(effect, SendHttp):
return effect.answer(HttpResponse())
return None
class EffectLogger(EffectHandler):
async def handle(self, effect: Effect[Any]) -> Optional[Answer[Any]]:
print('EffectLogger:', repr(effect))
return None
# Example program
async def print_tweets(user_id: str) -> int:
tweets = await get_tweets(user_id)
for t in tweets:
print(await get_followers('someone_else'))
print(await get_friends('yet_another_one'))
return len(tweets)
def _main() -> None:
handler = HandlerStack(
print('final output:', run_effects(handler, print_tweets('garciat')))
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment