Skip to content

Instantly share code, notes, and snippets.

@ktbarrett
Last active March 9, 2022 17:41
Show Gist options
  • Save ktbarrett/1730d05a297d8dc5a394aee11fd48378 to your computer and use it in GitHub Desktop.
Save ktbarrett/1730d05a297d8dc5a394aee11fd48378 to your computer and use it in GitHub Desktop.
from collections import deque
from typing import Deque, Generic, Optional, TypeVar
from cocotb.triggers import Event
T = TypeVar("T")
class RecvFailed(Exception):
...
class SendFailed(Exception):
...
class Channel(Generic[T]):
def __init__(self, maxsize: Optional[int] = None) -> None:
if isinstance(maxsize, int) and maxsize < 0:
raise ValueError("maxsize cannot be less than 0")
if maxsize == 0:
# fully synchronous case
raise NotImplementedError("maxsize == 0 case is not supported yet")
self._queue: Deque[T] = deque()
self._maxsize = maxsize
self._send_event = Event()
self._recv_event = Event()
@property
def maxsize(self) -> Optional[int]:
return self._maxsize
def _send(self, __value: T) -> None:
self._queue.append(__value)
self._send_event.set()
def send_nowait(self, __value: T) -> None:
if not self.send_available():
raise SendFailed
else:
self._send(__value)
async def send(self, __value: T) -> None:
await self.until_send_available()
self._send(__value)
def send_available(self) -> bool:
return self._maxsize is None or len(self._queue) < self._maxsize
async def until_send_available(self) -> None:
while not self.send_available():
self._recv_event.clear()
await self._recv_event.wait()
def _recv(self) -> T:
self._recv_event.set()
return self._queue.popleft()
def recv_nowait(self) -> T:
if not self.recv_available():
raise RecvFailed
else:
return self._recv()
async def recv(self) -> T:
await self.until_recv_available()
return self._recv()
def recv_available(self) -> bool:
return len(self._queue) > 0
async def until_recv_available(self) -> None:
while not self.recv_available():
self._send_event.clear()
await self._send_event.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment