Skip to content

Instantly share code, notes, and snippets.

@cyriltovena
Last active September 15, 2020 13:29
Show Gist options
  • Save cyriltovena/78cddf6ce7be11f62876b8655d2681a6 to your computer and use it in GitHub Desktop.
Save cyriltovena/78cddf6ce7be11f62876b8655d2681a6 to your computer and use it in GitHub Desktop.
ingester kube port forward
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/client-go/util/homedir"
)
func main() {
log.SetOutput(os.Stdout)
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
roundTripper, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
panic(err)
}
for i := 0; i < 33; i++ {
log.Printf("ingester-%d\n", i)
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", "thetradedesk", fmt.Sprintf("ingester-%d", i))
hostIP := strings.TrimLeft(config.Host, "htps:/")
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL)
stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
out, errOut := new(bytes.Buffer), new(bytes.Buffer)
ports := []string{fmt.Sprintf("500%d:9095", i)}
forwarder, err := portforward.New(dialer, ports, stopChan, readyChan, out, errOut)
if err != nil {
panic(err)
}
go func() {
for range readyChan { // Kubernetes will close this channel when it has something to tell us.
}
if len(errOut.String()) != 0 {
panic(errOut.String())
} else if len(out.String()) != 0 {
fmt.Println(out.String())
}
}()
go func() {
if err = forwarder.ForwardPorts(); err != nil { // Locks until stopChan is closed.
panic(err)
}
}()
<-readyChan
conn, err := grpc.Dial(fmt.Sprintf("localhost:500%d", i), grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
c := client.NewIngesterClient(conn)
ctx := user.InjectOrgID(context.Background(), "327")
ctx1, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
panic(err)
}
end := model.Now()
start := end.Add(-30 * time.Minute)
stream, err := c.QueryStream(ctx1, &client.QueryRequest{
EndTimestampMs: int64(end),
StartTimestampMs: int64(start),
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: "__name__", Value: "node_cpu_seconds_total"},
{Type: client.EQUAL, Name: "job", Value: "node"},
{Type: client.EQUAL, Name: "mode", Value: "idle"},
},
})
if err != nil {
panic(err)
}
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
// log.Println("DONE")
stopChan <- struct{}{}
break
}
panic(err)
}
// log.Println("msg rcv")
for _, s := range msg.Chunkseries {
log.Printf("labels: %s =>", client.FromLabelAdaptersToLabels(s.Labels).String())
// for _, c := range s.Chunks {
// ch, err := encoding.NewForEncoding(encoding.Encoding(byte(c.Encoding)))
// if err != nil {
// log.Println(err.Error())
// continue
// }
// err = ch.UnmarshalFromBuf(c.Data)
// if err != nil {
// log.Println(err.Error())
// continue
// }
// it := ch.NewIterator(nil)
// for it.Scan() {
// if it.Err() != nil {
// log.Println(it.Err())
// continue
// }
// spew.Dump(it.Value())
// }
// }
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment