Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
HTTP <> GRPC Cortex Distributors
package main
import (
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
)
const grpcServiceConfig = `{"loadBalancingPolicy":"round_robin"}`
type grpcProxy struct {
client httpgrpc.HTTPClient
}
func main() {
// if this DNS entry returns a list of IPs the above grpcServiceConfig will make grpc attempt to load balance over them
distributorHostPort := "localhost:9001"
listenHostPort := "127.0.0.1:8000"
enableAuth := false
// grpc client (sends to distributor)
dialOptions := []grpc.DialOption{
grpc.WithDefaultServiceConfig(grpcServiceConfig),
grpc.WithInsecure(),
}
if enableAuth {
// extracts the org id from the Go context and adds it to the outgoing GRPC headers
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(middleware.ClientUserHeaderInterceptor))
}
conn, err := grpc.Dial(distributorHostPort, dialOptions...)
if err != nil {
panic(err)
}
proxy := &grpcProxy{
client: httpgrpc.NewHTTPClient(conn),
}
// http server (receives from pushes from client)
// Note: the handler has been added to two common paths. Use whatever makes sense for your config
r := mux.NewRouter()
r.HandleFunc("/api/v1/push", proxy.ServeHTTP)
r.HandleFunc("/api/prom/push", proxy.ServeHTTP)
http.Handle("/", r)
handler := http.Handler(r)
if enableAuth {
// HTTPBasicAuthToCortex: basic auth -> X-Scope-OrgID
// AuthenticateUser: extracts the incoming X-Scope-OrgID HTTP header and adds it to the Go context
handler = HTTPBasicAuthToCortex(middleware.AuthenticateUser(handler))
}
srv := &http.Server{
Handler: handler,
Addr: listenHostPort,
}
fmt.Println("listening...")
log.Fatal(srv.ListenAndServe())
}
// ServeHTTP implements http.Handler
func (g *grpcProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
req, err := server.HTTPRequest(r)
if err != nil {
fmt.Println(err)
return
}
resp, err := g.client.Handle(r.Context(), req)
if err != nil {
// Some errors will actually contain a valid resp, just need to unpack it
var ok bool
resp, ok = httpgrpc.HTTPResponseFromError(err)
if !ok {
fmt.Println(err)
return
}
}
if err := server.WriteResponse(w, resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func HTTPBasicAuthToCortex(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u, _, ok := r.BasicAuth()
if !ok {
http.Error(w, "basic auth required", http.StatusUnauthorized)
return
}
r.Header.Set(user.OrgIDHeaderName, u)
next.ServeHTTP(w, r)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment