Skip to content

Instantly share code, notes, and snippets.

@keegancsmith
Created October 8, 2018 14:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save keegancsmith/405acb61311ca84e32982ea05a999ddb to your computer and use it in GitHub Desktop.
Save keegancsmith/405acb61311ca84e32982ea05a999ddb to your computer and use it in GitHub Desktop.
Example of how k8s sends protobuf responses for GET and GET watches
module k8sgetexample
require (
github.com/ericchiang/k8s v1.2.0
github.com/golang/protobuf v1.2.0
golang.org/x/net v0.0.0-20181005035420-146acd28ed58 // indirect
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
golang.org/x/text v0.3.0 // indirect
)
// This file contains an example of how kubernetes marshals objects for simple
// GET requests, as well as GET requests which have watch=1.
package main
import (
"context"
"encoding/binary"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"strconv"
"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/ericchiang/k8s/runtime"
"github.com/ericchiang/k8s/watch/versioned"
"github.com/golang/protobuf/proto"
)
func mkEndpoints(ips ...string) *corev1.Endpoints {
if len(ips) == 0 {
return &corev1.Endpoints{}
}
var addrs []*corev1.EndpointAddress
for _, ip := range ips {
addrs = append(addrs, &corev1.EndpointAddress{Ip: &ip})
}
return &corev1.Endpoints{
Subsets: []*corev1.EndpointSubset{{
Addresses: addrs,
}},
}
}
func marshal(pb proto.Message) []byte {
b, err := proto.Marshal(pb)
if err != nil {
log.Fatal(err)
}
return b
}
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// I am ignoring the request body for this test, and just assuming it is a get
// request on endpoints.
w.Header().Set("Content-Type", "application/vnd.kubernetes.protobuf")
// Assuming get request
if watch, _ := strconv.ParseBool(r.URL.Query().Get("watch")); !watch {
// Magic bytes for a k8s protobuf object
w.Write([]byte{0x6b, 0x38, 0x73, 0x00})
w.Write(marshal(mkEndpoints("10.0.0.1", "10.0.0.2", "10.0.0.3")))
return
}
sendEvent := func(event string, pb proto.Message) {
// Unknown allows api objects with unknown types to be
// passed-through. There should be many fields set, but the client
// I am using only reads Raw, which is the object we are sending.
unknown := runtime.Unknown{
Raw: marshal(pb),
}
raw := runtime.RawExtension{
// This is the type of []byte's in k8s, so is the value of
// "Object" in the event we send. The first few bytes are the
// magic bytes indicating the rest of the byte slice is of
// Unknown type.
Raw: append([]byte{0x6b, 0x38, 0x73, 0x00}, marshal(&unknown)...),
}
// This is the actual object we write. We marshal it first so we
// can know its size.
b := marshal(&versioned.Event{
Type: &event,
Object: &raw,
})
// Send the size down
if err := binary.Write(w, binary.BigEndian, uint32(len(b))); err != nil {
log.Fatal(err)
}
// Write the object
if _, err := w.Write(b); err != nil {
log.Fatal(err)
}
// Ensure the bytes are actually written out.
w.(http.Flusher).Flush()
}
// What follos is some fake events on endpoints.
sendEvent(k8s.EventAdded, mkEndpoints("10.0.0.1", "10.0.0.2", "10.0.0.3"))
sendEvent(k8s.EventModified, mkEndpoints("10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"))
sendEvent(k8s.EventModified, mkEndpoints())
sendEvent(k8s.EventModified, mkEndpoints("10.0.0.5"))
// Deleted event sends the state of the object just before deleting.
sendEvent(k8s.EventDeleted, mkEndpoints("10.0.0.5"))
}))
defer ts.Close()
client := &k8s.Client{
Endpoint: ts.URL,
Namespace: "default",
}
ctx := context.Background()
// Just a normal get
var endpoints corev1.Endpoints
err := client.Get(ctx, client.Namespace, "searcher", &endpoints)
if err != nil {
log.Fatal(err)
}
fmt.Println(endpoints)
// Now an example of watch
watcher, err := client.Watch(ctx, client.Namespace, new(corev1.Endpoints), k8s.QueryParam("fieldSelector", "metadata.name="+"searcher"))
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
for {
var endpoints corev1.Endpoints
eventType, err := watcher.Next(&endpoints)
if err == io.EOF {
// Normally we would just reconnect, but in this case we cancel
// since it is a test.
break
} else if err != nil {
log.Fatal(err)
}
fmt.Println(eventType, endpoints)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment