Last active
August 24, 2023 09:58
-
-
Save paulwinex/e5f17ca3712bc3e5a340e143c51bf54e to your computer and use it in GitHub Desktop.
fastapi socketio setup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import socketio | |
sio = socketio.AsyncClient(logger=True, engineio_logger=True) | |
async def fun(): | |
await sio.connect( | |
"http://127.0.0.1:8001/ws", | |
transports=['websocket'], | |
socketio_path='/ws/socket.io/' | |
) | |
await asyncio.sleep(0.5) | |
for i in range(3): | |
await sio.emit('message', {'data': 'Hello World!'}) | |
await asyncio.sleep(0.3) | |
await sio.disconnect() | |
print('exit') | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(asyncio.gather(fun())) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socketio | |
import uvicorn | |
from fastapi import FastAPI | |
sio = socketio.AsyncServer(async_mode="asgi") | |
app = FastAPI() | |
sio_app = socketio.ASGIApp( | |
sio, | |
) | |
app1 = socketio.ASGIApp(sio) | |
app.mount("/ws/", sio_app) | |
@app.get('/') | |
def index(): | |
return "Hello" | |
@sio.on("connect") | |
def connect(sid, environ): | |
print("User connected") | |
@sio.on("disconnect") | |
def connect(*args): | |
print("User disconnected", args) | |
@sio.on("message") | |
def connect(sid, data, *args): | |
print("User message", data) | |
if __name__ == "__main__": | |
uvicorn.run(app, host='0.0.0.0', port=8001) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment