Skip to content

Instantly share code, notes, and snippets.

@hrafnkelle
Created December 14, 2017 22:01
Show Gist options
  • Save hrafnkelle/c94a63565d78b0ecffd23056f8d75a0e to your computer and use it in GitHub Desktop.
Save hrafnkelle/c94a63565d78b0ecffd23056f8d75a0e to your computer and use it in GitHub Desktop.
Minimal event bus?
import asyncio
from collections import namedtuple
Event = namedtuple('Event',['source','data'])
observers = {}
def register(source, callback):
observers.setdefault(source,[]).append(callback)
def notify(event):
print(event)
if event.source in observers:
for observer in observers[event.source]:
asyncio.ensure_future(observer(event))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment