Skip to content

Instantly share code, notes, and snippets.

@Ananto30
Last active April 5, 2024 17:47
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
SSE message stream in Go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
// Example SSE server in Golang.
// $ go run sse.go
// Inspired from https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
type Message struct {
Name string `json:"name"`
Message string `json:"msg"`
}
func (broker *Broker) Stream(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
broker.closingClients <- messageChan
}()
for {
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(w, "data: %s\n\n", <-messageChan)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan, _ := range broker.clients {
clientMessageChan <- event
}
}
}
}
func (broker *Broker) BroadcastMessage(w http.ResponseWriter, r *http.Request) {
// params := mux.Vars(r)
var msg Message
_ = json.NewDecoder(r.Body).Decode(&msg)
// broker.Notifier <- []byte(fmt.Sprintf("the time is %v", msg))
j, _ := json.Marshal(msg)
broker.Notifier <- []byte(j)
json.NewEncoder(w).Encode(msg)
}
func init() {
}
func main() {
broker := NewServer()
router := mux.NewRouter()
router.HandleFunc("/messages", broker.BroadcastMessage).Methods("POST")
router.HandleFunc("/stream", broker.Stream).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", router))
}
@benc-uk
Copy link

benc-uk commented Nov 4, 2023

Hey, this is absolutely fantastic. Saved me hours (probably days!) of hassle & pain juggling channels and connect closing problems.
Thanks for sharing. Nice clear code too!

@Ananto30
Copy link
Author

Ananto30 commented Nov 4, 2023

Hi @benc-uk thanks, this is actually an old example (also most of it is copied :D) I wanted to update it but forgot time to time (your comment made me aware of this!). We might want to use a sync.Map for the clients registry. And we can have channel example as in, subscribe to a /channel/<name> and publish to a /channel/<name> which is more useful example :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment