Skip to content

Instantly share code, notes, and snippets.

@paulwinex
Last active August 24, 2023 09:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paulwinex/e5f17ca3712bc3e5a340e143c51bf54e to your computer and use it in GitHub Desktop.
Save paulwinex/e5f17ca3712bc3e5a340e143c51bf54e to your computer and use it in GitHub Desktop.
fastapi socketio setup
import asyncio
import socketio
sio = socketio.AsyncClient(logger=True, engineio_logger=True)
async def fun():
await sio.connect(
"http://127.0.0.1:8001/ws",
transports=['websocket'],
socketio_path='/ws/socket.io/'
)
await asyncio.sleep(0.5)
for i in range(3):
await sio.emit('message', {'data': 'Hello World!'})
await asyncio.sleep(0.3)
await sio.disconnect()
print('exit')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(fun()))
import socketio
import uvicorn
from fastapi import FastAPI
sio = socketio.AsyncServer(async_mode="asgi")
app = FastAPI()
sio_app = socketio.ASGIApp(
sio,
)
app1 = socketio.ASGIApp(sio)
app.mount("/ws/", sio_app)
@app.get('/')
def index():
return "Hello"
@sio.on("connect")
def connect(sid, environ):
print("User connected")
@sio.on("disconnect")
def connect(*args):
print("User disconnected", args)
@sio.on("message")
def connect(sid, data, *args):
print("User message", data)
if __name__ == "__main__":
uvicorn.run(app, host='0.0.0.0', port=8001)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment