Created
May 31, 2019 14:20
-
-
Save justinfay/1dfca1b1971e413b4b8d9257337e51b7 to your computer and use it in GitHub Desktop.
An example of using pubsub with websockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A websocket example. | |
We want to be able to subscribe and unsbscribe to streams. | |
""" | |
import asyncio | |
import collections | |
import json | |
import logging | |
import random | |
import websockets | |
logging.basicConfig() | |
USER_SUBSCRIPTIONS = collections.defaultdict(dict) | |
async def generator(type_): | |
while True: | |
print('generating a number') | |
yield {'{}'.format(type_): random.randint(0, 100)} | |
await asyncio.sleep(random.randint(0, 2)) | |
async def publisher_handler(websocket, type_, path): | |
while True: | |
try: | |
message = await USER_SUBSCRIPTIONS[websocket][type_].__anext__() | |
except AttributeError: | |
break | |
await websocket.send(json.dumps(message)) | |
async def hub(websocket, path): | |
while True: | |
async for message in websocket: | |
message = json.loads(message) | |
if 'subscribe' in message: | |
USER_SUBSCRIPTIONS[websocket][message['subscribe']] = generator(message['subscribe']) | |
asyncio.ensure_future(publisher_handler(websocket, message['subscribe'], path)) | |
elif 'unsubscribe' in message: | |
USER_SUBSCRIPTIONS[websocket][message['unsubscribe']] = None | |
async def client(): | |
async with websockets.connect( | |
'ws://localhost:6789') as websocket: | |
await websocket.send(json.dumps({'subscribe': 'foo'})) | |
i = 0 | |
while True: | |
msg = await websocket.recv() | |
print(msg) | |
i += 1 | |
if i == 5: | |
await websocket.send(json.dumps({'unsubscribe': 'foo'})) | |
break | |
await websocket.send(json.dumps({'subscribe': 'bill'})) | |
await websocket.send(json.dumps({'subscribe': 'bar'})) | |
i = 0 | |
while True: | |
msg = await websocket.recv() | |
print(msg) | |
i += 1 | |
if i == 5: | |
await websocket.send(json.dumps({'unsubscribe': 'bill'})) | |
elif i == 10: | |
await websocket.send(json.dumps({'unsubscribe': 'bar'})) | |
break | |
if __name__ == "__main__": | |
import sys | |
if sys.argv[1] == 'client': | |
asyncio.get_event_loop().run_until_complete(client()) | |
else: | |
start_server = websockets.serve(hub, 'localhost', 6789) | |
asyncio.get_event_loop().run_until_complete(start_server) | |
asyncio.get_event_loop().run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment