Skip to content

Instantly share code, notes, and snippets.

@ledmonster
Created August 25, 2020 04:45
Show Gist options
  • Save ledmonster/7d0de61a788b164a510c0359465a41f4 to your computer and use it in GitHub Desktop.
Save ledmonster/7d0de61a788b164a510c0359465a41f4 to your computer and use it in GitHub Desktop.
push prometheus exposition format metrics to VictoriaMetrics remote_write endpoint
package metrics
import (
"bytes"
"context"
"fmt"
"net/http"
"os"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/golang/snappy"
log "github.com/sirupsen/logrus"
)
var VictoriaMetricsEndpoint = os.Getenv("VICTORIA_METRICS_ENDPOINT")
func pushToVictoriaMetrics(ctx context.Context, job, instance, data string) error {
series := parseData(string(data), time.Now().Unix()*1000, func(s string) {
log.Errorf("parse error")
})
// append same labels as pushgateway
addLabels(series, []prompbmarshal.Label{
{Name: "job", Value: job},
{Name: "instance", Value: instance},
})
var raw, compressed []byte
req := prompbmarshal.WriteRequest{Timeseries: series}
raw = prompbmarshal.MarshalWriteRequest(raw, &req)
compressed = snappy.Encode(compressed, raw)
res, err := http.Post(VictoriaMetricsEndpoint+"/api/v1/write", "application/json", bytes.NewBuffer(compressed))
if err != nil {
return fmt.Errorf("failed to post metrics to VictoriaMetrics: %s", err)
}
defer res.Body.Close()
if res.StatusCode >= 400 {
return fmt.Errorf("failed to post metrics to VictoriaMetrics. response code: %d", res.StatusCode)
}
return nil
}
// brought from: https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/lib/promscrape/scrapework_test.go#L314
func parseData(data string, timestamp int64, errLogger func(s string)) []prompbmarshal.TimeSeries {
var rows parser.Rows
rows.UnmarshalWithErrLogger(data, errLogger)
var tss []prompbmarshal.TimeSeries
for _, r := range rows.Rows {
labels := []prompbmarshal.Label{
{
Name: "__name__",
Value: r.Metric,
},
}
for _, tag := range r.Tags {
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
var ts prompbmarshal.TimeSeries
ts.Labels = labels
ts.Samples = []prompbmarshal.Sample{
{
Value: r.Value,
Timestamp: timestamp,
},
}
tss = append(tss, ts)
}
return tss
}
func addLabels(series []prompbmarshal.TimeSeries, labels []prompbmarshal.Label) {
for i, _ := range series {
labelLoop:
for _, label := range labels {
for j, _label := range series[i].Labels {
// overwrite label value if label already exists
if _label.Name == label.Name {
series[i].Labels[j].Value = label.Value
continue labelLoop
}
}
series[i].Labels = append(series[i].Labels, label)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment