Skip to content

Instantly share code, notes, and snippets.

@polyscone
Created March 20, 2025 00:07
package main
import (
"fmt"
"log/slog"
"net/http"
"slices"
"sync"
"time"
)
var sse struct {
mu sync.Mutex
listeners []chan<- Event
}
func addListener(listener chan<- Event) {
sse.mu.Lock()
defer sse.mu.Unlock()
sse.listeners = append(sse.listeners, listener)
}
func removeListener(listener chan<- Event) {
sse.mu.Lock()
defer sse.mu.Unlock()
slices.DeleteFunc(sse.listeners, func(el chan<- Event) bool {
return el == listener
})
}
type Event struct {
Name string
Data string
}
// NOTES:
//
// An SSE client will send the last "id" it received in a "last-event-id" header
// This can be used to send any missed events etc. if needed
// Only sent if the client received an id as part of a previous event
// This can be spoofed, so check the user is authorised and validate the id
//
// HTTP/1.1 is limited to 6 simultaneous connections to a single domain
// HTTP/2 doesn't have that limit and defaults to 100; the HTTP library being used
// may allow configuration of this value
// Since Go 1.24 this is the `HTTP2` field's `MaxConcurrentStreams` value in `http.Server`
//
// Disconnecting a client from the server side will always cause the client to try
// and reconnect
// Send a 204 to tell the client not to reconnect, or create your own custom disconnect event
// that the client uses to close its connection
//
// Clients may disconnect and not reconnect when the server responds with an error, so
// clients need to detect those errors and manually reconnect in those cases
// Clients may also only try to reconnect once and then give up, so they should also detect
// those cases if they need to keep the connection alive
// Try to introduce some randomness on reconnects that could lead to a "trampling herd" problem
//
// Ensure any reverse proxies won't kill long-running connections
//
// Spec: https://html.spec.whatwg.org/multipage/server-sent-events.html
func sseHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
rc := http.NewResponseController(w)
// Headers for SSE
w.Header().Set("content-type", "text/event-stream")
w.Header().Set("cache-control", "no-cache")
w.Header().Set("connection", "keep-alive")
w.WriteHeader(http.StatusOK)
// Make sure the client gets the headers as quickly as possible
if err := rc.Flush(); err != nil {
slog.Error("sse: headers: could not flush data", "error", err)
return
}
events := make(chan Event, 100)
addListener(events)
defer removeListener(events)
heartbeat := 30 * time.Second
keepalive := time.NewTicker(heartbeat)
for {
select {
case event := <-events:
// Write the event
fmt.Fprintf(w, "event: %v\n", event.Name)
fmt.Fprintf(w, "data: %v\n", event.Data)
fmt.Fprintf(w, "\n\n")
// Make sure the client gets the event
if err := rc.Flush(); err != nil {
slog.Error("sse: event: could not flush data", "error", err)
return
}
case <-keepalive.C:
// Push the TCP deadline for writes forward
rc.SetWriteDeadline(time.Now().Add(heartbeat * 2))
// Send a comment to keep the connection alive with legacy proxies etc.
fmt.Fprintf(w, ":\n\n")
// Make sure the client gets the comment
if err := rc.Flush(); err != nil {
slog.Error("sse: keepalive: could not flush data", "error", err)
return
}
case <-ctx.Done():
// The client closed the connection, or we timed out or something
return
}
}
}
func main() {
http.HandleFunc("/", sseHandler)
fmt.Println("Listening...")
http.ListenAndServe("localhost:8080", nil)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment