Skip to content

Instantly share code, notes, and snippets.

Created Nov 2, 2018
What would you like to do?
aio pubsub pattern
from collections import defaultdict
from asyncio import gather
class Pubsub:
channels = defaultdict(list)
def add_handler(self, key: str, handler):
async def handle(self, ctx, key: str, event):
await gather(*(handler(ctx, event) for handler in self.channels[key]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment