Skip to content

Instantly share code, notes, and snippets.

@Nishantth1
Created July 1, 2024 14:32
Show Gist options
  • Save Nishantth1/973ca0380569d6b2c74795591576ec41 to your computer and use it in GitHub Desktop.
Save Nishantth1/973ca0380569d6b2c74795591576ec41 to your computer and use it in GitHub Desktop.

Server-Sent Events (SSE) and How to Implement Them in FastAPI

Server-Sent Events (SSE) is a server push technology enabling a server to push real-time updates to a client over a single HTTP connection. Unlike WebSockets, which allow bidirectional communication, SSE is unidirectional, meaning the server can send updates to the client, but the client cannot send messages back to the server over the same connection.

This blog will explore the concept of SSE, how to implement SSE in FastAPI, and a use case involving asynchronous threads using the threading class to handle real-time data for trading.

Understanding Server-Sent Events (SSE)

What is SSE?

SSE allows servers to push information to web clients over HTTP. It is a part of the HTML5 specification and works by keeping an HTTP connection open, enabling the server to send updates continuously.

Benefits of SSE

  • Simplicity: Easy to implement compared to WebSockets.
  • Automatic Reconnection: Built-in support for automatic reconnection in case of network issues.
  • Event Handling: Allows multiple types of events to be defined and handled.

SSE vs. WebSockets

Feature SSE WebSockets
Communication Unidirectional (server to client) Bidirectional (server and client)
Complexity Simple More complex
Connection Overhead Lower Higher
Use Cases Real-time updates, notifications Chat applications, gaming

Implementing SSE in FastAPI

FastAPI makes it easy to implement SSE by leveraging its support for asynchronous programming and streaming responses.

Basic SSE Implementation

  1. Install FastAPI and Uvicorn:

    pip install fastapi uvicorn
  2. Create a FastAPI App with SSE:

    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    import time
    
    app = FastAPI()
    
    async def event_generator():
        while True:
            yield f"data: The current time is {time.ctime()}\n\n"
            time.sleep(1)
    
    @app.get("/sse")
    async def sse_endpoint():
        return StreamingResponse(event_generator(), media_type="text/event-stream")
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="127.0.0.1", port=8000)

Testing the SSE Endpoint

Open your browser and navigate to http://127.0.0.1:8000/sse. You should see the server pushing updates to the client every second.

Use Case: Real-Time Trading Data with Asynchronous Threads

Problem Statement

For a trading application, real-time data updates are crucial. We need to handle multiple streams of data asynchronously to update clients with the latest trading information.

Solution

We can use the threading class in combination with FastAPI to create multiple threads, each handling different streams of trading data and sending updates to the client using SSE.

Implementation

  1. Enhanced SSE with Threading:

    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    import threading
    import time
    import random
    
    app = FastAPI()
    
    def generate_data(symbol, update_interval):
        while True:
            yield f"data: {symbol} price is {random.uniform(100, 200)}\n\n"
            time.sleep(update_interval)
    
    class SSEThread(threading.Thread):
        def __init__(self, symbol, update_interval):
            threading.Thread.__init__(self)
            self.symbol = symbol
            self.update_interval = update_interval
            self.data_generator = generate_data(symbol, update_interval)
        
        def run(self):
            pass
    
        def get_data(self):
            return next(self.data_generator)
    
    threads = {
        "AAPL": SSEThread("AAPL", 1),
        "GOOGL": SSEThread("GOOGL", 2)
    }
    
    for thread in threads.values():
        thread.start()
    
    async def event_generator(symbol):
        while True:
            yield threads[symbol].get_data()
    
    @app.get("/sse/{symbol}")
    async def sse_endpoint(symbol: str):
        if symbol in threads:
            return StreamingResponse(event_generator(symbol), media_type="text/event-stream")
        else:
            return {"error": "Symbol not found"}
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="127.0.0.1", port=8000)

Explanation

  • Threading Class: The SSEThread class is defined to create a thread for each trading symbol. It generates random trading data for demonstration purposes.
  • Threads Dictionary: A dictionary to hold instances of SSEThread for different symbols.
  • Event Generator: An asynchronous generator function that fetches data from the respective thread.
  • Endpoint: The SSE endpoint /sse/{symbol} streams real-time data for the given trading symbol.

DAG

graph TD
    Start --> InitFastAPI[Initialize FastAPI App]
    InitFastAPI --> DefineThreads[Define Threads for Trading Symbols]
    DefineThreads --> StartThreads[Start Threads]
    StartThreads --> SSEEndpoint[Define SSE Endpoint]
    SSEEndpoint --> RequestClient[Client Requests /sse/symbol]
    RequestClient --> CheckSymbol[Check if Symbol Exists]
    CheckSymbol -->|Symbol Exists| GetDataThread[Fetch Data from Thread]
    GetDataThread --> SendDataClient[Send Data to Client via SSE]
    CheckSymbol -->|Symbol Not Found| ReturnError[Return Error Response]

    subgraph ThreadManagement
        DefineThreads --> Thread1[SSEThread for AAPL]
        DefineThreads --> Thread2[SSEThread for GOOGL]
        Thread1 --> GenerateData1[Generate Data for AAPL]
        Thread2 --> GenerateData2[Generate Data for GOOGL]
        StartThreads --> StartThread1[Start Thread for AAPL]
        StartThreads --> StartThread2[Start Thread for GOOGL]
        GetDataThread --> FetchData1[Fetch Data from AAPL Thread]
        GetDataThread --> FetchData2[Fetch Data from GOOGL Thread]
    end

    subgraph DataFlow
        RequestClient --> EventGenerator[Call Event Generator]
        EventGenerator --> FetchDataThread[Fetch Data from Corresponding Thread]
        FetchDataThread --> StreamingResponse[Return Streaming Response]
        StreamingResponse --> ClientUpdates[Client Receives Real-time Updates]
    end
Loading

Testing the Enhanced SSE

Open your browser and navigate to http://127.0.0.1:8000/sse/AAPL and http://127.0.0.1:8000/sse/GOOGL. You should see real-time updates for both trading symbols.

Conclusion

Server-Sent Events (SSE) provide a straightforward way to push real-time updates from the server to the client. By integrating SSE with FastAPI and leveraging threading, we can handle multiple streams of data concurrently, making it an excellent choice for real-time trading applications. This approach ensures that clients receive timely updates, enhancing the responsiveness and user experience of trading platforms.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment