Skip to content

Instantly share code, notes, and snippets.

@f41gh7
Last active January 19, 2024 17:24
Show Gist options
  • Save f41gh7/1a67850e2cc795af05d96085de9c6653 to your computer and use it in GitHub Desktop.
Save f41gh7/1a67850e2cc795af05d96085de9c6653 to your computer and use it in GitHub Desktop.
VictoriaMetrics OTLP push

golang web app with opentelemetry metrics

Example of web-go application.

It could push collected metrics by http with opentelemetry insturmentation.

usage:

  1. build:
go get
go build main.go
  1. use
 #with cluster version
./main -vm.endpoint=localhost:8480 -vm.ingestPath='/insert/0/opentelemetry/api/v1/push'
# with single version
./main -vm.endpoint=localhost:8428 -vm.ingestPath='/opentelemetry/api/v1/push'
module otlp
go 1.18
require (
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.30.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.30.0
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/sdk/metric v0.30.0
)
require (
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.opentelemetry.io/proto/otlp v0.16.0 // indirect
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
golang.org/x/text v0.3.6 // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
)
package main
import (
"context"
"flag"
"log"
"net/http"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
pushclient "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)
var (
collectorEndpoint = flag.String("vm.endpoint", "localhost:8428", "VictoriaMetrics endpoint - host:port.")
collectorURL = flag.String("vm.ingestPath", "/opentelemetry/api/v1/push", "url path for ingestion path.")
isSecure = flag.Bool("vm.isSecure", false, "enables https connection for metrics push.")
pushInterval = flag.Duration("vm.pushInterval", 10*time.Second, "how often push samples, aka scrapeInterval at pull model.")
jobName = flag.String("metrics.jobName", "otlp", "job name for web-application.")
instanceName = flag.String("metrics.instance", "localhost", "hostname of web-application instance.")
)
var (
requestsCount syncint64.Counter
requestsLatency syncfloat64.Histogram
activeRequests int64
)
// Initializes an OTLP exporter, and configures the corresponding
// metric providers.
func initMetrics(ctx context.Context) func(ctx context.Context) error {
options := []pushclient.Option{
pushclient.WithEndpoint(*collectorEndpoint),
pushclient.WithURLPath(*collectorURL),
}
if !*isSecure {
options = append(options, pushclient.WithInsecure())
}
c := pushclient.NewClient(
options...,
)
metricExporter, err := otlpmetric.New(ctx, c)
if err != nil {
handleErr(err, "cannot create exporter")
}
resourceConfig, err := resource.New(ctx, resource.WithAttributes(attribute.String("job", *jobName), attribute.String("instance", *instanceName)))
if err != nil {
handleErr(err, "cannot create meter resource")
}
meterController := controller.New(
processor.NewFactory(
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries([]float64{0.01, 0.05, 0.1, 0.5, 0.9, 1.0, 5.0, 10.0, 100.0}),
),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
controller.WithExporter(metricExporter),
controller.WithCollectPeriod(*pushInterval),
controller.WithResource(resourceConfig),
)
if err := meterController.Start(ctx); err != nil {
handleErr(err, "cannot start meter controller")
}
global.SetMeterProvider(meterController)
prov := global.MeterProvider().Meter("")
requestsLatency, err = prov.SyncFloat64().Histogram("http_request_latency_seconds")
if err != nil {
handleErr(err, "cannot create histogram")
}
requestsCount, err = prov.SyncInt64().Counter("http_requests_total")
if err != nil {
handleErr(err, "cannot create counter")
}
ar, err := prov.AsyncInt64().Gauge("http_active_requests")
if err != nil {
handleErr(err, "cannot create gauge")
}
if err := prov.RegisterCallback([]instrument.Asynchronous{ar}, func(ctx context.Context) {
ar.Observe(ctx, atomic.LoadInt64(&activeRequests))
}); err != nil {
handleErr(err, "cannot register callback")
}
return metricExporter.Shutdown
}
type metricMiddleWare struct {
ctx context.Context
h http.Handler
}
func (m *metricMiddleWare) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t := time.Now()
path := r.URL.Path
requestsCount.Add(m.ctx, 1, attribute.String("path", path))
atomic.AddInt64(&activeRequests, 1)
m.h.ServeHTTP(w, r)
atomic.AddInt64(&activeRequests, -1)
requestsLatency.Record(m.ctx, time.Since(t).Seconds(), attribute.String("path", path))
}
func main() {
flag.Parse()
log.Printf("Starting web server...")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
shutdown := initMetrics(ctx)
mux := http.NewServeMux()
mux.HandleFunc("/api/fast", func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`fast ok`))
})
mux.HandleFunc("/api/slow", func(writer http.ResponseWriter, request *http.Request) {
time.Sleep(time.Second * 2)
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`slow ok`))
})
m := &metricMiddleWare{
ctx: ctx,
h: mux,
}
mustStop := make(chan os.Signal)
signal.Notify(mustStop, os.Interrupt, syscall.SIGTERM)
go func() {
http.ListenAndServe("localhost:8081", m)
}()
<-mustStop
log.Println("receive shutdown signal, stopping webserver")
if err := shutdown(ctx); err != nil {
log.Println("cannot shutdown metric provider ", err)
}
cancel()
log.Printf("Done!")
}
func handleErr(err error, message string) {
if err != nil {
log.Fatalf("%s: %v", message, err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment