Skip to content

Instantly share code, notes, and snippets.

@appeltel
Last active July 29, 2023 21:17
Show Gist options
  • Star 31 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save appeltel/fd3ddeeed6c330c7208502462639d2c9 to your computer and use it in GitHub Desktop.
Save appeltel/fd3ddeeed6c330c7208502462639d2c9 to your computer and use it in GitHub Desktop.
asyncio pubsub example
import asyncio
import random
class Hub():
def __init__(self):
self.subscriptions = set()
def publish(self, message):
for queue in self.subscriptions:
queue.put_nowait(message)
class Subscription():
def __init__(self, hub):
self.hub = hub
self.queue = asyncio.Queue()
def __enter__(self):
hub.subscriptions.add(self.queue)
return self.queue
def __exit__(self, type, value, traceback):
hub.subscriptions.remove(self.queue)
async def reader(name, hub):
await asyncio.sleep(random.random() * 15)
print(f'Reader {name} has decided to subscribe now!')
msg = ""
with Subscription(hub) as queue:
while msg != 'SHUTDOWN':
msg = await queue.get()
print(f'Reader {name} got message: {msg}')
if random.random() < 0.1:
print(f'Reader {name} has read enough')
break
print(f'Reader {name} is shutting down')
async def writer(iterations, hub):
for x in range(iterations):
print(f'Writer: I have {len(hub.subscriptions)} subscribers now')
hub.publish(f'Hello world - {x}')
await asyncio.sleep(3)
hub.publish('SHUTDOWN')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
hub = Hub()
readers = [reader(x, hub) for x in range(10)]
loop.run_until_complete(asyncio.gather(writer(10, hub), *readers))
import asyncio
from aiohttp import web
class Subscription():
def __init__(self, hub):
self.hub = hub
self.queue = asyncio.Queue()
def __enter__(self):
hub.subscriptions.add(self.queue)
return self.queue
def __exit__(self, type, value, traceback):
hub.subscriptions.remove(self.queue)
class Hub():
def __init__(self):
self.subscriptions = set()
def publish(self, message):
for queue in self.subscriptions:
queue.put_nowait(message)
hub = Hub()
async def sub(request):
resp = web.StreamResponse()
resp.headers['content-type'] = 'text/plain'
resp.status_code = 200
await resp.prepare(request)
with Subscription(hub) as queue:
while True:
msg = await queue.get()
resp.write(bytes(f'{msg}\r\n', 'utf-8'))
return resp
async def pub(request):
msg = request.query.get('msg', '')
hub.publish(msg)
return web.Response(text='ok')
if __name__ == '__main__':
app = web.Application()
app.router.add_get('/', sub)
app.router.add_post('/', pub)
web.run_app(app)
@sietekk
Copy link

sietekk commented Jan 10, 2020

This was super helpful for me! Thanks so much for putting this somewhere I could find it via Google!

@tsz662
Copy link

tsz662 commented Oct 14, 2022

Great! Thank you so much for sharing this helpful sample.

@cbensimon
Copy link

Thanks for this sample, really clean and helpful !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment