Server-Sent Events (SSE) is a server push technology enabling a server to push real-time updates to a client over a single HTTP connection. Unlike WebSockets, which allow bidirectional communication, SSE is unidirectional, meaning the server can send updates to the client, but the client cannot send messages back to the server over the same connection.
This blog will explore the concept of SSE, how to implement SSE in FastAPI, and a use case involving asynchronous threads using the threading class to handle real-time data for trading.
SSE allows servers to push information to web clients over HTTP. It is a part of the HTML5 specification and works by keeping an HTTP connection open, enabling the server to send updates continuously.
- Simplicity: Easy to implement compared to WebSockets.
- Automatic Reconnection: Built-in support for automatic reconnection in case of network issues.
- Event Handling: Allows multiple types of events to be defined and handled.
Feature | SSE | WebSockets |
---|---|---|
Communication | Unidirectional (server to client) | Bidirectional (server and client) |
Complexity | Simple | More complex |
Connection Overhead | Lower | Higher |
Use Cases | Real-time updates, notifications | Chat applications, gaming |
FastAPI makes it easy to implement SSE by leveraging its support for asynchronous programming and streaming responses.
-
Install FastAPI and Uvicorn:
pip install fastapi uvicorn
-
Create a FastAPI App with SSE:
from fastapi import FastAPI from fastapi.responses import StreamingResponse import time app = FastAPI() async def event_generator(): while True: yield f"data: The current time is {time.ctime()}\n\n" time.sleep(1) @app.get("/sse") async def sse_endpoint(): return StreamingResponse(event_generator(), media_type="text/event-stream") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8000)
Open your browser and navigate to http://127.0.0.1:8000/sse
. You should see the server pushing updates to the client every second.
For a trading application, real-time data updates are crucial. We need to handle multiple streams of data asynchronously to update clients with the latest trading information.
We can use the threading class in combination with FastAPI to create multiple threads, each handling different streams of trading data and sending updates to the client using SSE.
-
Enhanced SSE with Threading:
from fastapi import FastAPI from fastapi.responses import StreamingResponse import threading import time import random app = FastAPI() def generate_data(symbol, update_interval): while True: yield f"data: {symbol} price is {random.uniform(100, 200)}\n\n" time.sleep(update_interval) class SSEThread(threading.Thread): def __init__(self, symbol, update_interval): threading.Thread.__init__(self) self.symbol = symbol self.update_interval = update_interval self.data_generator = generate_data(symbol, update_interval) def run(self): pass def get_data(self): return next(self.data_generator) threads = { "AAPL": SSEThread("AAPL", 1), "GOOGL": SSEThread("GOOGL", 2) } for thread in threads.values(): thread.start() async def event_generator(symbol): while True: yield threads[symbol].get_data() @app.get("/sse/{symbol}") async def sse_endpoint(symbol: str): if symbol in threads: return StreamingResponse(event_generator(symbol), media_type="text/event-stream") else: return {"error": "Symbol not found"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8000)
- Threading Class: The
SSEThread
class is defined to create a thread for each trading symbol. It generates random trading data for demonstration purposes. - Threads Dictionary: A dictionary to hold instances of
SSEThread
for different symbols. - Event Generator: An asynchronous generator function that fetches data from the respective thread.
- Endpoint: The SSE endpoint
/sse/{symbol}
streams real-time data for the given trading symbol.
graph TD
Start --> InitFastAPI[Initialize FastAPI App]
InitFastAPI --> DefineThreads[Define Threads for Trading Symbols]
DefineThreads --> StartThreads[Start Threads]
StartThreads --> SSEEndpoint[Define SSE Endpoint]
SSEEndpoint --> RequestClient[Client Requests /sse/symbol]
RequestClient --> CheckSymbol[Check if Symbol Exists]
CheckSymbol -->|Symbol Exists| GetDataThread[Fetch Data from Thread]
GetDataThread --> SendDataClient[Send Data to Client via SSE]
CheckSymbol -->|Symbol Not Found| ReturnError[Return Error Response]
subgraph ThreadManagement
DefineThreads --> Thread1[SSEThread for AAPL]
DefineThreads --> Thread2[SSEThread for GOOGL]
Thread1 --> GenerateData1[Generate Data for AAPL]
Thread2 --> GenerateData2[Generate Data for GOOGL]
StartThreads --> StartThread1[Start Thread for AAPL]
StartThreads --> StartThread2[Start Thread for GOOGL]
GetDataThread --> FetchData1[Fetch Data from AAPL Thread]
GetDataThread --> FetchData2[Fetch Data from GOOGL Thread]
end
subgraph DataFlow
RequestClient --> EventGenerator[Call Event Generator]
EventGenerator --> FetchDataThread[Fetch Data from Corresponding Thread]
FetchDataThread --> StreamingResponse[Return Streaming Response]
StreamingResponse --> ClientUpdates[Client Receives Real-time Updates]
end
Open your browser and navigate to http://127.0.0.1:8000/sse/AAPL
and http://127.0.0.1:8000/sse/GOOGL
. You should see real-time updates for both trading symbols.
Server-Sent Events (SSE) provide a straightforward way to push real-time updates from the server to the client. By integrating SSE with FastAPI and leveraging threading, we can handle multiple streams of data concurrently, making it an excellent choice for real-time trading applications. This approach ensures that clients receive timely updates, enhancing the responsiveness and user experience of trading platforms.