Skip to content

Instantly share code, notes, and snippets.

@hagen1778
Last active December 20, 2023 08:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hagen1778/9487f71d9a19e04d521d764b576a0ec2 to your computer and use it in GitHub Desktop.
Save hagen1778/9487f71d9a19e04d521d764b576a0ec2 to your computer and use it in GitHub Desktop.
vmctl trace requests
diff --git a/app/vmctl/main.go b/app/vmctl/main.go
index 95743081f..151e9115b 100644
--- a/app/vmctl/main.go
+++ b/app/vmctl/main.go
@@ -230,7 +230,7 @@ func main() {
if err != nil {
return fmt.Errorf("error initilize auth config for destination: %s", dstAddr)
}
- dstHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive}}
+ dstHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive, ResponseHeaderTimeout: time.Second * 60}}
p := vmNativeProcessor{
rateLimit: c.Int64(vmRateLimit),
@@ -286,7 +286,7 @@ func main() {
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1)
}
var blocksCount uint64
- if err := stream.Parse(f, isBlockGzipped, func(block *stream.Block) error {
+ if err := stream.Parse(f, nil, isBlockGzipped, func(block *stream.Block) error {
atomic.AddUint64(&blocksCount, 1)
return nil
}); err != nil {
diff --git a/app/vmctl/vm_native.go b/app/vmctl/vm_native.go
index 3cbea821d..d26f00d6f 100644
--- a/app/vmctl/vm_native.go
+++ b/app/vmctl/vm_native.go
@@ -5,8 +5,10 @@ import (
"fmt"
"io"
"log"
+ "net/url"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/cheggaaa/pb/v3"
@@ -114,6 +116,8 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst
return nil
}
+var iterationCounter uint64
+
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
reader, err := p.src.ExportPipe(ctx, srcURL, f)
if err != nil {
@@ -125,10 +129,18 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
reader = bar.NewProxyReader(reader)
}
+ iteration := atomic.AddUint64(&iterationCounter, 1)
+ dstURL += fmt.Sprintf("?query=%s", url.QueryEscape(f.Match))
+ dstURL += fmt.Sprintf("&iteration=%d", iteration)
+
pr, pw := io.Pipe()
importCh := make(chan error)
go func() {
- importCh <- p.dst.ImportPipe(ctx, dstURL, pr)
+ importErr := p.dst.ImportPipe(ctx, dstURL, pr)
+ if importErr != nil && strings.Contains(importErr.Error(), "timeout awaiting response headers") {
+ pr.Close()
+ }
+ importCh <- importErr
close(importCh)
}()
@@ -152,7 +164,12 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
return err
}
- return <-importCh
+ start := time.Now()
+ log.Printf("%q: request finished - waiting for import to finish", dstURL)
+ importRes := <-importCh
+ log.Printf("%q: request completed in %v with error: %s", dstURL, time.Since(start), importRes)
+ return importRes
+
}
func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time, silent bool) error {
diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go
index f5e7cd714..801873696 100644
--- a/app/vminsert/native/request_handler.go
+++ b/app/vminsert/native/request_handler.go
@@ -27,7 +27,7 @@ func InsertHandler(req *http.Request) error {
return err
}
isGzip := req.Header.Get("Content-Encoding") == "gzip"
- return stream.Parse(req.Body, isGzip, func(block *stream.Block) error {
+ return stream.Parse(req.Body, req, isGzip, func(block *stream.Block) error {
return insertRows(block, extraLabels)
})
}
diff --git a/lib/protoparser/native/stream/streamparser.go b/lib/protoparser/native/stream/streamparser.go
index 9cf4bafc9..71be78157 100644
--- a/lib/protoparser/native/stream/streamparser.go
+++ b/lib/protoparser/native/stream/streamparser.go
@@ -3,8 +3,11 @@ package stream
import (
"bufio"
"fmt"
+ "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"io"
+ "net/http"
"sync"
+ "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -19,7 +22,7 @@ import (
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold block after returning.
-func Parse(r io.Reader, isGzip bool, callback func(block *Block) error) error {
+func Parse(r io.Reader, req *http.Request, isGzip bool, callback func(block *Block) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
@@ -48,19 +51,27 @@ func Parse(r io.Reader, isGzip bool, callback func(block *Block) error) error {
// Read native blocks and feed workers with work.
sizeBuf := make([]byte, 4)
+ processingStart := time.Now()
+
+ defer func() {
+ logger.Infof("%q (%s): whole processing took %v", req.URL.RawQuery, tr.String(), time.Since(processingStart))
+ }()
+
ctx := &streamContext{}
for {
uw := getUnmarshalWork()
uw.tr = tr
uw.ctx = ctx
uw.callback = callback
-
// Read uw.metricNameBuf
if _, err := io.ReadFull(br, sizeBuf); err != nil {
if err == io.EOF {
+ finishStart := time.Now()
+ logger.Infof("%q (%s): EOF received - finishing the work", req.URL.RawQuery, tr.String())
// End of stream
putUnmarshalWork(uw)
ctx.wg.Wait()
+ logger.Infof("%q (%s): last processing is done in %v", req.URL.RawQuery, tr.String(), time.Since(finishStart))
return ctx.err
}
readErrors.Inc()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment