Created
November 9, 2022 17:26
-
-
Save Finwood/786dd86f414ee253902ca7be0d7263f5 to your computer and use it in GitHub Desktop.
Simple Asynchronous PubSub Implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Inspired by https://gist.github.com/appeltel/fd3ddeeed6c330c7208502462639d2c9 | |
import asyncio | |
import logging | |
from typing import Generic, TypeVar | |
T = TypeVar("T") | |
class Topic(Generic[T]): | |
def __init__(self, name: str): | |
self.name = name | |
self.logger = logging.getLogger(f"{__name__}.topic.{self.name}") | |
self._subscriptions: set[asyncio.Queue[T]] = set() | |
self.logger.info(f"{self} created") | |
def publish(self, message: T) -> None: | |
for queue in self._subscriptions: | |
if queue.full(): | |
# pop oldest item | |
queue.get_nowait() | |
queue.put_nowait(message) | |
self.logger.debug(message) | |
def subscribe(self, depth: int = 0) -> "Subscription[T]": | |
return Subscription(self, depth) | |
def __str__(self) -> str: | |
description = f"Topic '{self.name}'" | |
if self._subscriptions: | |
n = len(self._subscriptions) | |
description += f", {n} subscriber{'s' if n > 1 else ''}" | |
return description | |
def __repr__(self) -> str: | |
return f"{self.__class__.__name__}({self.name!r})" | |
class Subscription(Generic[T]): | |
def __init__(self, topic: Topic[T], depth: int = 0): | |
self._topic = topic | |
self.depth = depth | |
def __enter__(self) -> asyncio.Queue[T]: | |
self._queue: asyncio.Queue[T] = asyncio.Queue(self.depth) | |
self._topic._subscriptions.add(self._queue) | |
self._topic.logger.info("Subscription added") | |
return self._queue | |
def __exit__(self, exc_type, exc_value, traceback) -> None: | |
self._topic._subscriptions.remove(self._queue) | |
self._topic.logger.info("Subscription removed") | |
delattr(self, "_queue") | |
def __str__(self) -> str: | |
return f"Subscription ({self.depth}) to {self._topic}" | |
def __repr__(self) -> str: | |
return f"{self.__class__.__name__}({self._topic!r}, depth={self.depth})" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment