Skip to content

Instantly share code, notes, and snippets.

@hagen1778
Created December 5, 2023 15:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hagen1778/a38a0d5348a1d5ee5912b9a281d4fc2b to your computer and use it in GitHub Desktop.
Save hagen1778/a38a0d5348a1d5ee5912b9a281d4fc2b to your computer and use it in GitHub Desktop.
vmctl HTTP trace
--- a/app/vmctl/vm_native.go
+++ b/app/vmctl/vm_native.go
@@ -5,8 +5,11 @@ import (
"fmt"
"io"
"log"
+ "net/http/httptrace"
+ "sort"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
@@ -35,6 +38,8 @@ type vmNativeProcessor struct {
disableRetries bool
isSilent bool
isNative bool
+
+ reqCounter atomic.Int32
}
const (
@@ -113,7 +118,47 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst
return nil
}
+type journal struct {
+ jmu sync.Mutex
+ j map[int][]string
+}
+
+func (j *journal) Print() {
+ j.jmu.Lock()
+ defer j.jmu.Unlock()
+
+ keys := make([]int, 0, len(j.j))
+ for k := range j.j {
+ keys = append(keys, k)
+ }
+ sort.Ints(keys)
+
+ for _, k := range keys {
+ for _, r := range j.j[k] {
+ fmt.Println(k, ":", r)
+ }
+ fmt.Println("")
+ }
+}
+
+func (j *journal) Printf(i int32, s string, args ...any) {
+ now := time.Now()
+ s = fmt.Sprintf(s, args...)
+ log.Print(s)
+ j.jmu.Lock()
+ j.j[int(i)] = append(j.j[int(i)], fmt.Sprintf("%v\t%s", now.Format(time.DateTime), s))
+ j.jmu.Unlock()
+}
+
+var runJournal = journal{j: make(map[int][]string, 0)}
+
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
+ startTime := time.Now()
+ var exportDuration time.Duration
+
+ iteration := p.reqCounter.Add(1)
+ runJournal.Printf(iteration, "%q: starting", f.Match)
+
reader, err := p.src.ExportPipe(ctx, srcURL, f)
if err != nil {
return fmt.Errorf("failed to init export pipe: %w", err)
@@ -126,12 +171,34 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
pr, pw := io.Pipe()
done := make(chan struct{})
+
+ var importDuration time.Duration
go func() {
defer func() { close(done) }()
+ startTime := time.Now()
+ trace := &httptrace.ClientTrace{
+ WroteRequest: func(wri httptrace.WroteRequestInfo) {
+ runJournal.Printf(iteration, "\t%q==>WroteRequest; err: %+v", f.Match, wri)
+ },
+ GotConn: func(gci httptrace.GotConnInfo) {
+ runJournal.Printf(iteration, "\t%q==>GotConn: %+v", f.Match, gci)
+ },
+ GetConn: func(string) {
+ runJournal.Printf(iteration, "\t%q==>GetConn", f.Match)
+ },
+ WroteHeaders: func() {
+ runJournal.Printf(iteration, "\t%q==>WroteHeaders", f.Match)
+ },
+ GotFirstResponseByte: func() {
+ runJournal.Printf(iteration, "\t%q==>GotFirstResponseByte", f.Match)
+ },
+ }
+ ctx = httptrace.WithClientTrace(ctx, trace)
if err := p.dst.ImportPipe(ctx, dstURL, pr); err != nil {
logger.Errorf("error initialize import pipe: %s", err)
return
}
+ importDuration = time.Since(startTime)
}()
w := io.Writer(pw)
@@ -153,7 +220,11 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
if err := pw.Close(); err != nil {
return err
}
+
+ exportDuration = time.Since(startTime)
<-done
+ runJournal.Printf(iteration, "%q: export took %v; import took %v; delay: %v; bytesWritten: %s",
+ f.Match, exportDuration, importDuration, importDuration-exportDuration, byteCountSI(written))
return nil
}
@@ -297,6 +368,8 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
return fmt.Errorf("import process failed: %s", err)
}
+ runJournal.Print()
+
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment