Skip to content

Instantly share code, notes, and snippets.

@schmohlio
Forked from ismasan/sse.go
Last active Sep 21, 2022
Embed
What would you like to do?
Example SSE server in Golang
// v2 of the great example of SSE in go by @ismasan.
// includes fixes:
// * infinite loop ending in panic
// * closing a client twice
// * potentially blocked listen() from closing a connection during multiplex step.
package main
import (
"fmt"
"log"
"net/http"
"time"
)
// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second*1
// Example SSE server in Golang.
// $ go run sse.go
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := rw.(http.CloseNotifier).CloseNotify()
for {
select {
case <-notify:
return
default:
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan, _ := range broker.clients {
select {
case clientMessageChan <- event:
case <-time.After(patience):
log.Print("Skipping client.")
}
}
}
}
}
func main() {
broker := NewServer()
go func() {
for {
time.Sleep(time.Second * 2)
eventString := fmt.Sprintf("the time is %v", time.Now())
log.Println("Receiving event")
broker.Notifier <- []byte(eventString)
}
}()
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))
}
@jeremylowery
Copy link

jeremylowery commented Jan 30, 2017

This is a much better implementation than the original. Great work!

@syndbg
Copy link

syndbg commented May 3, 2017

Note that here https://gist.github.com/schmohlio/d7bdb255ba61d3f5e51a512a7c0d6a85#file-sse-go-L108 you can have a race condition and would definitely benefit from using a mutex.

@zhaohansprt
Copy link

zhaohansprt commented Jul 18, 2017

i think it 's no need for mutex , the func listen runs linearlly there is no race conditon

@mirosval
Copy link

mirosval commented Oct 27, 2017

I think this is susceptible to dropped connections. If the client closes the connection properly, the CloseNotifier should handle that, but if the connection is just severed, there is no way to recover from that. The writes keep on succeeding, but there is nobody to receive them.

@waylandc
Copy link

waylandc commented Dec 5, 2018

Why does the clients map contain booleans for the value?

@Embiggenerd
Copy link

Embiggenerd commented Jul 13, 2019

Bolleans are there to have some kind of value that doesn't take up much space. Should probably be using an empty struct.

@dani-91
Copy link

dani-91 commented Nov 29, 2019

Hi, I tried it and it works but only on localhost with JavaScript EventSource-Library. If I use it with http://myhostname:3000 then it doesn't work. What could be the problem?

@tlossen
Copy link

tlossen commented Feb 18, 2020

@dani-91 maybe CORS?

@goliveok
Copy link

goliveok commented Apr 12, 2020

hi all i need response, err := http.Get live one source to multiple connection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment