Skip to content

Instantly share code, notes, and snippets.

@graingert
Forked from foxmask/client.py
Created Sep 3, 2021
Embed
What would you like to do?
websocket client
#!/usr/bin/env python3
import sys
import asyncio
import json
import websockets
async def hello():
uri = "ws://localhost:8888"
async with websockets.connect(uri) as websocket:
try:
while True:
await websocket.send(json.dumps({"type": "info", 'action': 'minus'}))
await websocket.recv()
await asyncio.sleep(0.5)
finally:
await websocket.send(json.dumps({"type": "disconnect"}))
async def amain():
await hello()
return 0
def main():
asyncio.run(hello())
if __name__ == "__main__":
sys.exit(main())
#!/usr/bin/env python
# WS server example that synchronizes state across clients
import asyncio
import json
import logging
import websockets
from rich.console import Console
console = Console()
logging.basicConfig()
STATE = {"value": 0}
USERS = set()
def state_event():
return json.dumps({"type": "state", **STATE})
def users_event():
return json.dumps({"type": "users", "count": len(USERS)})
async def notify_state():
if USERS: # asyncio.wait doesn't accept an empty list
message = state_event()
await asyncio.wait([user.send(message) for user in USERS])
async def notify_users():
if USERS: # asyncio.wait doesn't accept an empty list
message = users_event()
await asyncio.wait([user.send(message) for user in USERS])
async def register(websocket):
USERS.add(websocket)
await notify_users()
async def unregister(websocket):
USERS.remove(websocket)
await notify_users()
async def counter(websocket, path):
# register(websocket) sends user_event() to websocket
await register(websocket)
try:
await websocket.send(state_event())
async for message in websocket:
console.print(message)
data = json.loads(message)
if data["action"] == "minus":
STATE["value"] -= 1
await notify_state()
elif data["action"] == "plus":
STATE["value"] += 1
await notify_state()
else:
logging.error("unsupported event: %s", data)
finally:
await unregister(websocket)
start_server = websockets.serve(counter, "localhost", 8888)
console.print("serveur demarre")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
from starlette.applications import Starlette
import starlette.websockets
import uvicorn
from rich.console import Console
console = Console()
app = Starlette()
@app.websocket_route('/')
async def websocket_endpoint(websocket):
await websocket.accept()
try:
# Process incoming messages
while True:
mesg = await websocket.receive_json(mode="text")
if mesg["type"] == "disconnect":
try:
await websocket.receive_bytes()
except starlette.websockets.WebSocketDisconnect:
return
else:
raise Exception("client did not disconnect")
console.print(f"messsage to submit {mesg}")
hmm = await websocket.send_json(mesg)
console.print(f"hmmm {hmm}")
console.print(f"messsage submitted {mesg}")
finally:
await websocket.close()
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=8888)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment