Skip to content

Instantly share code, notes, and snippets.

@justinfay
Created May 31, 2019 14:20
Show Gist options
  • Save justinfay/1dfca1b1971e413b4b8d9257337e51b7 to your computer and use it in GitHub Desktop.
Save justinfay/1dfca1b1971e413b4b8d9257337e51b7 to your computer and use it in GitHub Desktop.
An example of using pubsub with websockets
"""
A websocket example.
We want to be able to subscribe and unsbscribe to streams.
"""
import asyncio
import collections
import json
import logging
import random
import websockets
logging.basicConfig()
USER_SUBSCRIPTIONS = collections.defaultdict(dict)
async def generator(type_):
while True:
print('generating a number')
yield {'{}'.format(type_): random.randint(0, 100)}
await asyncio.sleep(random.randint(0, 2))
async def publisher_handler(websocket, type_, path):
while True:
try:
message = await USER_SUBSCRIPTIONS[websocket][type_].__anext__()
except AttributeError:
break
await websocket.send(json.dumps(message))
async def hub(websocket, path):
while True:
async for message in websocket:
message = json.loads(message)
if 'subscribe' in message:
USER_SUBSCRIPTIONS[websocket][message['subscribe']] = generator(message['subscribe'])
asyncio.ensure_future(publisher_handler(websocket, message['subscribe'], path))
elif 'unsubscribe' in message:
USER_SUBSCRIPTIONS[websocket][message['unsubscribe']] = None
async def client():
async with websockets.connect(
'ws://localhost:6789') as websocket:
await websocket.send(json.dumps({'subscribe': 'foo'}))
i = 0
while True:
msg = await websocket.recv()
print(msg)
i += 1
if i == 5:
await websocket.send(json.dumps({'unsubscribe': 'foo'}))
break
await websocket.send(json.dumps({'subscribe': 'bill'}))
await websocket.send(json.dumps({'subscribe': 'bar'}))
i = 0
while True:
msg = await websocket.recv()
print(msg)
i += 1
if i == 5:
await websocket.send(json.dumps({'unsubscribe': 'bill'}))
elif i == 10:
await websocket.send(json.dumps({'unsubscribe': 'bar'}))
break
if __name__ == "__main__":
import sys
if sys.argv[1] == 'client':
asyncio.get_event_loop().run_until_complete(client())
else:
start_server = websockets.serve(hub, 'localhost', 6789)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment