Skip to content

Instantly share code, notes, and snippets.

@maestre3d
Created July 25, 2020 17:15
Show Gist options
  • Save maestre3d/4a42e8fa552694f7c97c4811ce913e23 to your computer and use it in GitHub Desktop.
Save maestre3d/4a42e8fa552694f7c97c4811ce913e23 to your computer and use it in GitHub Desktop.
Server Sent Events (SSE) broker written in Go
package sse
import (
"encoding/json"
"fmt"
"github.com/alexandria-oss/core"
"github.com/go-kit/kit/log"
"net/http"
"sync"
)
// Event entity, required for SSE operations
type Event struct {
ID uint64
Message []byte
// Consumer client's distributed ID
Consumer uint64
}
// NewEvent returns an event entity with a valid event ID
func NewEvent(msg []byte, c uint64) Event {
// Generate unique id for each event
id := core.NewSonyflakeID()
return Event{
ID: id,
Message: msg,
Consumer: c,
}
}
// String parses an event into a single string
func (e Event) String() string {
return fmt.Sprintf("%d: [%s,%d)", e.ID, string(e.Message), e.Consumer)
}
// Broker manages all SSE event transactions and contains a consumer pool
type Broker struct {
// consumers subscriber pool using which assigns a Distributed ID for each client
consumers map[chan Event]uint64
logger log.Logger
mtx *sync.Mutex
}
// NewBroker returns a valid SSE broker
func NewBroker(logger log.Logger) *Broker {
return &Broker{
consumers: make(map[chan Event]uint64),
mtx: new(sync.Mutex),
logger: logger,
}
}
// Subscribe returns a new broker consumer; listens to broker's events and generates a
// valid consumer ID
func (b *Broker) Subscribe() chan Event {
b.mtx.Lock()
defer b.mtx.Unlock()
// Generate unique id for each client
id := core.NewSonyflakeID()
c := make(chan Event)
b.consumers[c] = id
_ = b.logger.Log("resource", "transport.eventsource.broker", "msg",
fmt.Sprintf("client %d connected", id))
return c
}
// Unsubscribe removes a consumer from broker's pool
func (b *Broker) Unsubscribe(c chan Event) {
b.mtx.Lock()
defer b.mtx.Unlock()
id := b.consumers[c]
close(c)
delete(b.consumers, c)
_ = b.logger.Log("resource", "transport.eventsource.broker", "msg",
fmt.Sprintf("client %d killed, %d remaining", id, len(b.consumers)))
}
// Publish issues a new event to either one or many consumers
func (b *Broker) Publish(e Event) {
b.mtx.Lock()
defer b.mtx.Unlock()
pubMsg := 0
for s, id := range b.consumers {
if e.Consumer > 0 {
// Push to specific consumer
if id == e.Consumer {
s <- e
pubMsg++
break
}
} else {
// Push to every consumer
e.Consumer = id
s <- e
// Reset unused consumer
e.Consumer = 0
pubMsg++
}
}
_ = b.logger.Log("resource", "transport.eventsource.broker", "msg",
fmt.Sprintf("published message to %d subscribers", pubMsg))
}
// Close removes any channels leftovers from broker's pool
func (b *Broker) Close() {
for k, _ := range b.consumers {
close(k)
delete(b.consumers, k)
}
}
// ServeHTTP receives and attach a new broker subscription to every HTTP request
// using required streaming HTTP headers
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming is not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Create new client channel for stream events
c := b.Subscribe()
defer b.Unsubscribe(c)
// Send client it's new ID
go b.Publish(NewEvent(nil, b.consumers[c]))
for {
select {
case msg := <-c:
// MIME Type (text/event-stream) formatted, DO NOT MODIFY IT
msgJSON, err := json.Marshal(struct {
ID uint64 `json:"event_id"`
Message string `json:"message"`
Consumer uint64 `json:"consumer_id"`
}{msg.ID, string(msg.Message), msg.Consumer})
if err != nil {
_, _ = fmt.Fprintf(w, "data: %s\n\n", msg)
} else {
_, _ = fmt.Fprintf(w, "data: %s\n\n", msgJSON)
}
f.Flush()
case <-ctx.Done():
return
}
}
}
@frederikhors
Copy link

I'm having troubles using it like this:

go func() {
    for {
        time.Sleep(time.Second * 1)
        broker.Publish(NewEvent([]byte("first"), 0))
        broker.Publish(NewEvent([]byte("second"), 0))
    }
}()

When I use this (2 or more events) I cannot Unsubscribe anymore, my app is frozen.

Why?

Is there a way to fix?

@maestre3d
Copy link
Author

I'm having troubles using it like this:

go func() {
    for {
        time.Sleep(time.Second * 1)
        broker.Publish(NewEvent([]byte("first"), 0))
        broker.Publish(NewEvent([]byte("second"), 0))
    }
}()

When I use this (2 or more events) I cannot Unsubscribe anymore, my app is frozen.

Why?

Is there a way to fix?

Sorry, this was just a test. Right now I am working on a project called Gluon. It contains an in-memory bus that could solve issues like this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment