Skip to content

Instantly share code, notes, and snippets.

@mypy-play
Created April 3, 2023 18:47
Show Gist options
  • Save mypy-play/a14338d6ab8d5c26b17cfabbe794ef6f to your computer and use it in GitHub Desktop.
Save mypy-play/a14338d6ab8d5c26b17cfabbe794ef6f to your computer and use it in GitHub Desktop.
Shared via mypy Playground
from typing import *
T = TypeVar("T")
V = TypeVar("V")
class Input(Protocol[T]):
def send(self, value: T, /) -> None:
...
class Connectable(Protocol[T]):
def connect(self, other: V := InputLike[T], /) -> V:
...
class Broadcaster(Input[T], Connectable[T]):
def __init__(self) -> None:
self._connections = List[InputLike[T]] = []
def send(self, arg: T, /) -> None:
for c in self._connections:
c.send(arg)
def connect(self, other: V := InputLike[T], /) -> V:
self._connections.append(other)
return other
class Function(Input[T], Broadcaster[U]):
def __init__(self, func: Callable[[T], U]) -> None:
self._func = func
super().__init__()
def send(self, arg: T, /) -> None:
return super().send(self._func(arg))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment