Skip to content

Instantly share code, notes, and snippets.

@wfng92
Created November 17, 2020 04:58
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wfng92/f7a125808ea3b2f9ee0ee84976da694b to your computer and use it in GitHub Desktop.
Save wfng92/f7a125808ea3b2f9ee0ee84976da694b to your computer and use it in GitHub Desktop.
from typing import List
import queue
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from apscheduler.schedulers.asyncio import AsyncIOScheduler
app = FastAPI()
app.queue_system = queue.Queue()
app.queue_limit = 5
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>Your ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
manager = ConnectionManager()
async def myfunc():
for i in range(app.queue_limit):
if not app.queue_system.empty():
obj = app.queue_system.get_nowait()
if obj['websocket'] in manager.active_connections:
await manager.send_personal_message(f"You wrote: {obj['message']}", obj['websocket'])
app.scheduler = AsyncIOScheduler()
app.scheduler.add_job(myfunc, 'interval', seconds=5)
app.scheduler.start()
@app.get("/")
async def main():
return HTMLResponse(html)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
message = await websocket.receive_text()
app.queue_system.put({"message": message, "websocket": websocket})
except WebSocketDisconnect:
manager.disconnect(websocket)
print(f"Client #{client_id} disconnected")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment