Skip to content

Instantly share code, notes, and snippets.

@Ananto30
Last active April 26, 2024 08:58
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
SSE message stream in Go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
// Example SSE server in Golang.
// $ go run sse.go
// Inspired from https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
type Message struct {
Name string `json:"name"`
Message string `json:"msg"`
}
func (broker *Broker) Stream(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
broker.closingClients <- messageChan
}()
for {
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(w, "data: %s\n\n", <-messageChan)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan, _ := range broker.clients {
clientMessageChan <- event
}
}
}
}
func (broker *Broker) BroadcastMessage(w http.ResponseWriter, r *http.Request) {
// params := mux.Vars(r)
var msg Message
_ = json.NewDecoder(r.Body).Decode(&msg)
// broker.Notifier <- []byte(fmt.Sprintf("the time is %v", msg))
j, _ := json.Marshal(msg)
broker.Notifier <- []byte(j)
json.NewEncoder(w).Encode(msg)
}
func init() {
}
func main() {
broker := NewServer()
router := mux.NewRouter()
router.HandleFunc("/messages", broker.BroadcastMessage).Methods("POST")
router.HandleFunc("/stream", broker.Stream).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", router))
}
@Ananto30
Copy link
Author

Ananto30 commented Nov 4, 2023

Hi @benc-uk thanks, this is actually an old example (also most of it is copied :D) I wanted to update it but forgot time to time (your comment made me aware of this!). We might want to use a sync.Map for the clients registry. And we can have channel example as in, subscribe to a /channel/<name> and publish to a /channel/<name> which is more useful example :D

@MagicMajky
Copy link

MagicMajky commented Apr 26, 2024

Hey thanks for the implementation @Ananto30 its great!

One thing http.CloseNotifier from notify := w.(http.CloseNotifier).CloseNotify() in Broker.Stream() is deprecated now. Could you update the example to use [Request.Context] instead?

For me this is the first result when I search for "gorilla mux sse" on google so it would be nice to have this example uptodate.

I would add a MR but I don't know how to do it here.
Here is my suggestion how the updated code could look:

for {
    select {
    // Listen to connection close and un-register messageChan
    case <-r.Context().Done():
        // remove this client from the map of connected clients
        broker.closingClients <- messageChan
        return
	
    // Listen for incoming messages from messageChan
    case msg := <-messageChan:
        // Write to the ResponseWriter
        // Server Sent Events compatible
        fmt.Fprintf(w, "data: %s\n\n", msg)
        // Flush the data immediatly instead of buffering it for later.
        flusher.Flush()
    }
}

Also I have a question. If we return from the Broker.Stream() method when the connection is closed and we send the msgChannel to Broker.ClosingClients manually, does it make sense to also send it in the defer function? Aren't we sending it twice?
I think either just return when the connection closes and let the defer handle unregistering the client from the broker or unregister the client manually when the connection closes and don't use defer. This is not reflected in my suggestion tho, so if you are going to update it could you also take a look at this?

Have a nice day and thanks for your work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment