Skip to content

Instantly share code, notes, and snippets.

@rikonor
Created January 19, 2019 02:43

Revisions

  1. rikonor created this gist Jan 19, 2019.
    115 changes: 115 additions & 0 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    package main

    import (
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
    )

    func main() {
    nc := NewNotificationCenter()

    go func() {
    for {
    b := []byte(time.Now().Format(time.RFC3339))
    if err := nc.Notify(b); err != nil {
    log.Fatal(err)
    }

    time.Sleep(1 * time.Second)
    }
    }()

    http.HandleFunc("/sse", handleSSE(nc))
    http.ListenAndServe(":8001", nil)
    }

    type UnsubscribeFunc func() error

    type Subscriber interface {
    Subscribe(c chan []byte) (UnsubscribeFunc, error)
    }

    func handleSSE(s Subscriber) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
    // Subscribe
    c := make(chan []byte)
    unsubscribeFn, err := s.Subscribe(c)
    if err != nil {
    http.Error(w, err.Error(), http.StatusInternalServerError)
    return
    }

    // Signal SSE Support
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    Looping:
    for {
    select {
    case <-r.Context().Done():
    if err := unsubscribeFn(); err != nil {
    http.Error(w, err.Error(), http.StatusInternalServerError)
    return
    }
    break Looping

    default:
    b := <-c
    fmt.Fprintf(w, "data: %s\n\n", b)

    w.(http.Flusher).Flush()
    }
    }
    }
    }

    type Notifier interface {
    Notify(b []byte) error
    }

    type NotificationCenter struct {
    subscribers map[chan []byte]struct{}
    subscribersMu *sync.Mutex
    }

    func NewNotificationCenter() *NotificationCenter {
    return &NotificationCenter{
    subscribers: map[chan []byte]struct{}{},
    subscribersMu: &sync.Mutex{},
    }
    }

    func (nc *NotificationCenter) Subscribe(c chan []byte) (UnsubscribeFunc, error) {
    nc.subscribersMu.Lock()
    nc.subscribers[c] = struct{}{}
    nc.subscribersMu.Unlock()

    unsubscribeFn := func() error {
    nc.subscribersMu.Lock()
    delete(nc.subscribers, c)
    nc.subscribersMu.Unlock()

    return nil
    }

    return unsubscribeFn, nil
    }

    func (nc *NotificationCenter) Notify(b []byte) error {
    nc.subscribersMu.Lock()
    defer nc.subscribersMu.Unlock()

    for c := range nc.subscribers {
    select {
    case c <- b:
    default:
    }
    }

    return nil
    }