Created
October 8, 2018 14:08
-
-
Save keegancsmith/405acb61311ca84e32982ea05a999ddb to your computer and use it in GitHub Desktop.
Example of how k8s sends protobuf responses for GET and GET watches
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module k8sgetexample | |
require ( | |
github.com/ericchiang/k8s v1.2.0 | |
github.com/golang/protobuf v1.2.0 | |
golang.org/x/net v0.0.0-20181005035420-146acd28ed58 // indirect | |
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect | |
golang.org/x/text v0.3.0 // indirect | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This file contains an example of how kubernetes marshals objects for simple | |
// GET requests, as well as GET requests which have watch=1. | |
package main | |
import ( | |
"context" | |
"encoding/binary" | |
"fmt" | |
"io" | |
"log" | |
"net/http" | |
"net/http/httptest" | |
"strconv" | |
"github.com/ericchiang/k8s" | |
corev1 "github.com/ericchiang/k8s/apis/core/v1" | |
"github.com/ericchiang/k8s/runtime" | |
"github.com/ericchiang/k8s/watch/versioned" | |
"github.com/golang/protobuf/proto" | |
) | |
func mkEndpoints(ips ...string) *corev1.Endpoints { | |
if len(ips) == 0 { | |
return &corev1.Endpoints{} | |
} | |
var addrs []*corev1.EndpointAddress | |
for _, ip := range ips { | |
addrs = append(addrs, &corev1.EndpointAddress{Ip: &ip}) | |
} | |
return &corev1.Endpoints{ | |
Subsets: []*corev1.EndpointSubset{{ | |
Addresses: addrs, | |
}}, | |
} | |
} | |
func marshal(pb proto.Message) []byte { | |
b, err := proto.Marshal(pb) | |
if err != nil { | |
log.Fatal(err) | |
} | |
return b | |
} | |
func main() { | |
log.SetFlags(log.LstdFlags | log.Lshortfile) | |
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
// I am ignoring the request body for this test, and just assuming it is a get | |
// request on endpoints. | |
w.Header().Set("Content-Type", "application/vnd.kubernetes.protobuf") | |
// Assuming get request | |
if watch, _ := strconv.ParseBool(r.URL.Query().Get("watch")); !watch { | |
// Magic bytes for a k8s protobuf object | |
w.Write([]byte{0x6b, 0x38, 0x73, 0x00}) | |
w.Write(marshal(mkEndpoints("10.0.0.1", "10.0.0.2", "10.0.0.3"))) | |
return | |
} | |
sendEvent := func(event string, pb proto.Message) { | |
// Unknown allows api objects with unknown types to be | |
// passed-through. There should be many fields set, but the client | |
// I am using only reads Raw, which is the object we are sending. | |
unknown := runtime.Unknown{ | |
Raw: marshal(pb), | |
} | |
raw := runtime.RawExtension{ | |
// This is the type of []byte's in k8s, so is the value of | |
// "Object" in the event we send. The first few bytes are the | |
// magic bytes indicating the rest of the byte slice is of | |
// Unknown type. | |
Raw: append([]byte{0x6b, 0x38, 0x73, 0x00}, marshal(&unknown)...), | |
} | |
// This is the actual object we write. We marshal it first so we | |
// can know its size. | |
b := marshal(&versioned.Event{ | |
Type: &event, | |
Object: &raw, | |
}) | |
// Send the size down | |
if err := binary.Write(w, binary.BigEndian, uint32(len(b))); err != nil { | |
log.Fatal(err) | |
} | |
// Write the object | |
if _, err := w.Write(b); err != nil { | |
log.Fatal(err) | |
} | |
// Ensure the bytes are actually written out. | |
w.(http.Flusher).Flush() | |
} | |
// What follos is some fake events on endpoints. | |
sendEvent(k8s.EventAdded, mkEndpoints("10.0.0.1", "10.0.0.2", "10.0.0.3")) | |
sendEvent(k8s.EventModified, mkEndpoints("10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4")) | |
sendEvent(k8s.EventModified, mkEndpoints()) | |
sendEvent(k8s.EventModified, mkEndpoints("10.0.0.5")) | |
// Deleted event sends the state of the object just before deleting. | |
sendEvent(k8s.EventDeleted, mkEndpoints("10.0.0.5")) | |
})) | |
defer ts.Close() | |
client := &k8s.Client{ | |
Endpoint: ts.URL, | |
Namespace: "default", | |
} | |
ctx := context.Background() | |
// Just a normal get | |
var endpoints corev1.Endpoints | |
err := client.Get(ctx, client.Namespace, "searcher", &endpoints) | |
if err != nil { | |
log.Fatal(err) | |
} | |
fmt.Println(endpoints) | |
// Now an example of watch | |
watcher, err := client.Watch(ctx, client.Namespace, new(corev1.Endpoints), k8s.QueryParam("fieldSelector", "metadata.name="+"searcher")) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer watcher.Close() | |
for { | |
var endpoints corev1.Endpoints | |
eventType, err := watcher.Next(&endpoints) | |
if err == io.EOF { | |
// Normally we would just reconnect, but in this case we cancel | |
// since it is a test. | |
break | |
} else if err != nil { | |
log.Fatal(err) | |
} | |
fmt.Println(eventType, endpoints) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment