Skip to content

Instantly share code, notes, and snippets.

@jaycosaur
Last active July 18, 2022 19:26
Show Gist options
  • Save jaycosaur/890b3e9a9041ba8f441583af1a1a4e89 to your computer and use it in GitHub Desktop.
Save jaycosaur/890b3e9a9041ba8f441583af1a1a4e89 to your computer and use it in GitHub Desktop.
Async and Sync queue message multicasting to multiple queues. This is the implementation of a message fanout strategy for worker threads and processes. Note this doesn't create worker threads / processes, it only manages (in a blocking way) multicasting messages.
from typing import Type, Set, Any
from multiprocessing import Queue
import asyncio
class MulticastQueue:
def __init__(self, queue_constructor: Type[Queue] = Queue) -> None:
self.subscribers: Set[Queue] = set()
self.constructor = queue_constructor
def register(self) -> Queue:
queue = self.constructor()
self.subscribers.add(queue)
return queue
def unregister(self, subscriber: Queue) -> None:
self.subscribers.remove(subscriber)
def multicast(self, message: Any) -> None:
for subscriber in self.subscribers:
subscriber.put(message)
class AsyncMulticastQueue:
def __init__(self) -> None:
self.subscribers: Set[asyncio.Queue] = set()
def register(self) -> asyncio.Queue:
queue: asyncio.Queue = asyncio.Queue()
self.subscribers.add(queue)
return queue
def unregister(self, subscriber: asyncio.Queue) -> None:
self.subscribers.remove(subscriber)
async def multicast(self, message: Any) -> None:
for subscriber in self.subscribers:
await subscriber.put(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment