Skip to content

Instantly share code, notes, and snippets.

@rikonor
Created January 19, 2019 02:43
Show Gist options
  • Star 18 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save rikonor/e53a33c27ed64861c91a095a59f0aa44 to your computer and use it in GitHub Desktop.
Save rikonor/e53a33c27ed64861c91a095a59f0aa44 to your computer and use it in GitHub Desktop.
Server Sent Events (SSE) Example in Go
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
)
func main() {
nc := NewNotificationCenter()
go func() {
for {
b := []byte(time.Now().Format(time.RFC3339))
if err := nc.Notify(b); err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
}
}()
http.HandleFunc("/sse", handleSSE(nc))
http.ListenAndServe(":8001", nil)
}
type UnsubscribeFunc func() error
type Subscriber interface {
Subscribe(c chan []byte) (UnsubscribeFunc, error)
}
func handleSSE(s Subscriber) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Subscribe
c := make(chan []byte)
unsubscribeFn, err := s.Subscribe(c)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Signal SSE Support
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
Looping:
for {
select {
case <-r.Context().Done():
if err := unsubscribeFn(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
break Looping
default:
b := <-c
fmt.Fprintf(w, "data: %s\n\n", b)
w.(http.Flusher).Flush()
}
}
}
}
type Notifier interface {
Notify(b []byte) error
}
type NotificationCenter struct {
subscribers map[chan []byte]struct{}
subscribersMu *sync.Mutex
}
func NewNotificationCenter() *NotificationCenter {
return &NotificationCenter{
subscribers: map[chan []byte]struct{}{},
subscribersMu: &sync.Mutex{},
}
}
func (nc *NotificationCenter) Subscribe(c chan []byte) (UnsubscribeFunc, error) {
nc.subscribersMu.Lock()
nc.subscribers[c] = struct{}{}
nc.subscribersMu.Unlock()
unsubscribeFn := func() error {
nc.subscribersMu.Lock()
delete(nc.subscribers, c)
nc.subscribersMu.Unlock()
return nil
}
return unsubscribeFn, nil
}
func (nc *NotificationCenter) Notify(b []byte) error {
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
for c := range nc.subscribers {
select {
case c <- b:
default:
}
}
return nil
}
@dougneal
Copy link

This is exactly what I needed right now. Thank you!

@swilcox
Copy link

swilcox commented Jan 26, 2021

Definitely a huge help. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment