Skip to content

Instantly share code, notes, and snippets.

@Finwood
Created November 9, 2022 17:26
Show Gist options
  • Save Finwood/786dd86f414ee253902ca7be0d7263f5 to your computer and use it in GitHub Desktop.
Save Finwood/786dd86f414ee253902ca7be0d7263f5 to your computer and use it in GitHub Desktop.
Simple Asynchronous PubSub Implementation
# Inspired by https://gist.github.com/appeltel/fd3ddeeed6c330c7208502462639d2c9
import asyncio
import logging
from typing import Generic, TypeVar
T = TypeVar("T")
class Topic(Generic[T]):
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"{__name__}.topic.{self.name}")
self._subscriptions: set[asyncio.Queue[T]] = set()
self.logger.info(f"{self} created")
def publish(self, message: T) -> None:
for queue in self._subscriptions:
if queue.full():
# pop oldest item
queue.get_nowait()
queue.put_nowait(message)
self.logger.debug(message)
def subscribe(self, depth: int = 0) -> "Subscription[T]":
return Subscription(self, depth)
def __str__(self) -> str:
description = f"Topic '{self.name}'"
if self._subscriptions:
n = len(self._subscriptions)
description += f", {n} subscriber{'s' if n > 1 else ''}"
return description
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.name!r})"
class Subscription(Generic[T]):
def __init__(self, topic: Topic[T], depth: int = 0):
self._topic = topic
self.depth = depth
def __enter__(self) -> asyncio.Queue[T]:
self._queue: asyncio.Queue[T] = asyncio.Queue(self.depth)
self._topic._subscriptions.add(self._queue)
self._topic.logger.info("Subscription added")
return self._queue
def __exit__(self, exc_type, exc_value, traceback) -> None:
self._topic._subscriptions.remove(self._queue)
self._topic.logger.info("Subscription removed")
delattr(self, "_queue")
def __str__(self) -> str:
return f"Subscription ({self.depth}) to {self._topic}"
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._topic!r}, depth={self.depth})"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment