Skip to content

Instantly share code, notes, and snippets.

@trusch
Created December 6, 2018 13:56
Show Gist options
  • Save trusch/03ba95d6ec68fd11f3bfdc291e5431e9 to your computer and use it in GitHub Desktop.
Save trusch/03ba95d6ec68fd11f3bfdc291e5431e9 to your computer and use it in GitHub Desktop.
package events
import (
"encoding/json"
"io"
"net/http"
"github.com/joomcode/errorx"
"github.com/nats-io/go-nats-streaming"
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
)
// Handler provides an HTTP handler to serve websocket requests to subscribe/unsubscribe to/from event streams
type Handler struct {
natsClient stan.Conn
}
// NewHandler creates a new http handler
// it takes the address of the nats server and the cluster id of the nats-streaming server as input
func NewHandler(addr, clusterID string) (http.Handler, error) {
cli, err := stan.Connect(clusterID, uuid.NewV4().String(), stan.NatsURL(addr))
if err != nil {
return nil, errorx.Decorate(err, "failed to connect to nats")
}
return &Handler{cli}, nil
}
// ServeHTTP implements the http.Handler interface
func (handler *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
websocket.Handler(handler.Handle).ServeHTTP(w, r)
}
// Handle is the function which actually handles the successfully upgraded websocket connection
func (handler *Handler) Handle(ws *websocket.Conn) {
logrus.Infof("handle new client")
defer logrus.Infof("finished handling client")
conn := &Connection{
handler,
ws,
make(map[string]stan.Subscription),
make(chan struct{}, 0),
}
conn.Handle()
}
// Connection represents a single websocket connection
type Connection struct {
handler *Handler
ws *websocket.Conn
subscriptions map[string]stan.Subscription
done chan struct{}
}
// Handle handles a single websocket connection
// it continuesly reads requests from the client:
// subscribe request: { type: 'suscribe', channel: 'example', seq: 123}
// unsubscribe request: { type: 'unsuscribe', channel: 'example'}
// if the client is subscribed to a topic the handler sends all events from that
// channel starting with the event with the specified sequence id + it will send events as they occur
func (c *Connection) Handle() {
requestBuf := make([]byte, 4096)
for {
// read and parse the client request
bs, err := c.ws.Read(requestBuf[:])
if err != nil {
if err == io.EOF {
return
}
logrus.Error(err)
return
}
req := &clientRequest{}
if err = json.Unmarshal(requestBuf[:bs], req); err != nil {
logrus.Error(err)
return
}
logrus.Infof("got new client request: %+v", req)
if req.Type == "subscribe" {
// client want a new subscription
if _, ok := c.subscriptions[req.Channel]; ok {
logrus.Warnf("already subscribed to %v", req.Channel)
continue
}
// ok, so we setup a subscription to the nats channel and setup a handler
// which sends the events to the client
s, err := c.handler.natsClient.Subscribe(req.Channel, func(m *stan.Msg) {
evt := &event{
Channel: req.Channel,
Seq: m.Sequence,
}
err := json.Unmarshal(m.Data, &evt.Data)
if err != nil {
// the event data is not valid json, we require that!
logrus.Error(err)
c.cleanSubscription(req.Channel)
c.ws.Close()
return
}
bs, err := json.Marshal(evt)
if err != nil {
// I dont know how this could happen, but if we fail to marshal the event, we close the websocket
logrus.Error(err)
c.cleanSubscription(req.Channel)
c.ws.Close()
return
}
_, err = c.ws.Write(bs)
if err != nil {
// it seems like the client closed the connection, so cleanup and exit
logrus.Error(err)
c.cleanSubscription(req.Channel)
c.ws.Close()
return
}
}, stan.StartAtSequence(req.Seq))
if err != nil {
// we failed to subscribe to the channel, its most likely that the channel identifier is malformed
logrus.Error(err)
c.ws.Close()
}
// lets save the subscription
c.subscriptions[req.Channel] = s
logrus.Infof("successfully subscribed client from %v", req.Channel)
} else {
// clients wants to unsubscribe -> close the subscription if any
c.cleanSubscription(req.Channel)
logrus.Infof("unsubscribed client from %v", req.Channel)
}
}
}
// cleanSubscription closes a subscription if any and removes it from the subscriptions list
func (c *Connection) cleanSubscription(topic string) {
if s, ok := c.subscriptions[topic]; ok {
s.Close()
delete(c.subscriptions, topic)
}
}
// clientRequest is the structure of the requests the client send over the websocket
type clientRequest struct {
Type string `json:"type"`
Channel string `json:"channel"`
Seq uint64 `json:"seq"`
}
// event is the structure used by the server to report events to the client
type event struct {
Channel string `json:"channel"`
Data map[string]interface{} `json:"data"`
Seq uint64 `json:"seq"`
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment