/sse_example.go Secret
Created
March 20, 2025 00:07
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log/slog" | |
"net/http" | |
"slices" | |
"sync" | |
"time" | |
) | |
var sse struct { | |
mu sync.Mutex | |
listeners []chan<- Event | |
} | |
func addListener(listener chan<- Event) { | |
sse.mu.Lock() | |
defer sse.mu.Unlock() | |
sse.listeners = append(sse.listeners, listener) | |
} | |
func removeListener(listener chan<- Event) { | |
sse.mu.Lock() | |
defer sse.mu.Unlock() | |
slices.DeleteFunc(sse.listeners, func(el chan<- Event) bool { | |
return el == listener | |
}) | |
} | |
type Event struct { | |
Name string | |
Data string | |
} | |
// NOTES: | |
// | |
// An SSE client will send the last "id" it received in a "last-event-id" header | |
// This can be used to send any missed events etc. if needed | |
// Only sent if the client received an id as part of a previous event | |
// This can be spoofed, so check the user is authorised and validate the id | |
// | |
// HTTP/1.1 is limited to 6 simultaneous connections to a single domain | |
// HTTP/2 doesn't have that limit and defaults to 100; the HTTP library being used | |
// may allow configuration of this value | |
// Since Go 1.24 this is the `HTTP2` field's `MaxConcurrentStreams` value in `http.Server` | |
// | |
// Disconnecting a client from the server side will always cause the client to try | |
// and reconnect | |
// Send a 204 to tell the client not to reconnect, or create your own custom disconnect event | |
// that the client uses to close its connection | |
// | |
// Clients may disconnect and not reconnect when the server responds with an error, so | |
// clients need to detect those errors and manually reconnect in those cases | |
// Clients may also only try to reconnect once and then give up, so they should also detect | |
// those cases if they need to keep the connection alive | |
// Try to introduce some randomness on reconnects that could lead to a "trampling herd" problem | |
// | |
// Ensure any reverse proxies won't kill long-running connections | |
// | |
// Spec: https://html.spec.whatwg.org/multipage/server-sent-events.html | |
func sseHandler(w http.ResponseWriter, r *http.Request) { | |
ctx := r.Context() | |
rc := http.NewResponseController(w) | |
// Headers for SSE | |
w.Header().Set("content-type", "text/event-stream") | |
w.Header().Set("cache-control", "no-cache") | |
w.Header().Set("connection", "keep-alive") | |
w.WriteHeader(http.StatusOK) | |
// Make sure the client gets the headers as quickly as possible | |
if err := rc.Flush(); err != nil { | |
slog.Error("sse: headers: could not flush data", "error", err) | |
return | |
} | |
events := make(chan Event, 100) | |
addListener(events) | |
defer removeListener(events) | |
heartbeat := 30 * time.Second | |
keepalive := time.NewTicker(heartbeat) | |
for { | |
select { | |
case event := <-events: | |
// Write the event | |
fmt.Fprintf(w, "event: %v\n", event.Name) | |
fmt.Fprintf(w, "data: %v\n", event.Data) | |
fmt.Fprintf(w, "\n\n") | |
// Make sure the client gets the event | |
if err := rc.Flush(); err != nil { | |
slog.Error("sse: event: could not flush data", "error", err) | |
return | |
} | |
case <-keepalive.C: | |
// Push the TCP deadline for writes forward | |
rc.SetWriteDeadline(time.Now().Add(heartbeat * 2)) | |
// Send a comment to keep the connection alive with legacy proxies etc. | |
fmt.Fprintf(w, ":\n\n") | |
// Make sure the client gets the comment | |
if err := rc.Flush(); err != nil { | |
slog.Error("sse: keepalive: could not flush data", "error", err) | |
return | |
} | |
case <-ctx.Done(): | |
// The client closed the connection, or we timed out or something | |
return | |
} | |
} | |
} | |
func main() { | |
http.HandleFunc("/", sseHandler) | |
fmt.Println("Listening...") | |
http.ListenAndServe("localhost:8080", nil) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment