Skip to content

Instantly share code, notes, and snippets.

@trusch
Created December 6, 2018 16:09
Show Gist options
  • Save trusch/790af091d0496cf66def95d66de1886c to your computer and use it in GitHub Desktop.
Save trusch/790af091d0496cf66def95d66de1886c to your computer and use it in GitHub Desktop.
package http
import (
"context"
"encoding/json"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/contiamo/event-bus/pkg/events"
)
// LongPollingHandler is an http handler which serves event streams over websocket
type LongPollingHandler struct {
cli events.ControllerClient
}
func (handler *LongPollingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
channel := r.URL.Query()["channel"]
if channel == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
seqStr := GetLongPollingIndex(r)
seq, err := strconv.ParseInt(seqStr, 10, 64)
if err != nil {
seq = 0
}
pollDuration := GetLongPollDuration(r)
ctx, cancel := context.WithTimeout(r.Context(), pollDuration)
defer cancel()
stream, err := handler.cli.Subscribe(ctx, &events.SubscribeRequest{
Channel: channel,
Seq: seq,
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
res := make([]*event.Event, 0)
for {
evt, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
w.WriteHeader(http.StatusInternalServerError)
return
}
res = append(res, evt)
}
if len(res) > 0 {
SetLongPollingIndex(w, strconv.Itoa(res[len(res)-1].Seq))
json.NewEncoder(w).Encode(res)
return
}
WriteLongPollingTimeout(w)
}
const (
// LabsLongPollingHeaderKey is the header value used to send the long polling position
// index value. This value will be an encoded value. Getter and Setter helpers are
// provided for handling this value without needing to deal with encoding/decoding
// directly.
LabsLongPollingHeaderKey = "X-Polling-Index"
)
// GetLongPollingIndex extracts and decodes the long-polling index value from the
// hex encoded GET parameter. If the value is invalid or missing, this will return
// the empty string
func GetLongPollingIndex(r *http.Request) string {
// check the GET parameter and the Prefer header
index := r.URL.Query().Get("index")
if index != "" {
return MustDecodeString(index)
}
prefer := r.Header["Prefer"]
for _, x := range prefer {
x = strings.TrimSpace(x)
preferences := strings.Split(x, ";")
for _, p := range preferences {
p = strings.TrimSpace(p)
if !strings.HasPrefix(p, "index") {
continue
}
parts := strings.Split(p, "=")
if len(parts) > 1 {
index = parts[1]
return MustDecodeString(index)
}
}
}
return ""
}
// SetLongPollingIndex sets the `X-Polling-Index` and
// `X-Polling-Update-Strategy` headers on the ResponsWriter.
// If the index is the empty string, it is a no-op.
func SetLongPollingIndex(w http.ResponseWriter, index string) {
if index == "" {
return
}
w.Header().Set(LabsLongPollingHeaderKey, EncodeToString(index))
}
// WriteLongPollingTimeout writes the timeout status code to the ResponseWriter
func WriteLongPollingTimeout(w http.ResponseWriter) {
w.WriteHeader(http.StatusNotModified)
}
// GetLongPollDuration parses the long-polling Prefer header and returns the value
// as a duration. The return value will be bound between a min of 15 and max of 60 seconds.
func GetLongPollDuration(r *http.Request) time.Duration {
return time.Duration(GetRequestTimeout(r)) * time.Second
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment