Skip to content

Instantly share code, notes, and snippets.

@tk42
Last active October 7, 2022 04:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tk42/005d4471cc6407d4abc1fa0f5b2cc55b to your computer and use it in GitHub Desktop.
Save tk42/005d4471cc6407d4abc1fa0f5b2cc55b to your computer and use it in GitHub Desktop.
WebsocketClient on jupyter notebook
import sys
import ssl
import time
import json
import threading
import asyncio
import websockets
URL = "wss://localhost/"
ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
class WebSocketClient:
def __init__(self, loop):
asyncio.set_event_loop(loop)
self.loop = loop
async def on_data(self, websocket, req):
# write processing whenever a new data is arrieved
pass
async def polling(self, websocket):
while True:
req = json.loads(await websocket.recv())
if req == "":
await websocket.pong()
else:
await self.on_data(websocket, req)
def start(self):
websocket = self.loop.run_until_complete(
asyncio.ensure_future(websockets.connect(URL, ssl=ctx))
)
self.loop.run_until_complete(self.polling(websocket))
self.loop.run_forever()
def create():
c = WebSocketClient(asyncio.new_event_loop())
c.start()
t = threading.Thread(target=create)
try:
t.setDaemon(True)
t.start()
while t.is_alive():
continue # blocking
except Exception as e:
print(str(e))
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment