Skip to content

Instantly share code, notes, and snippets.

@bonnefoa
Last active March 21, 2024 09:11
Show Gist options
  • Save bonnefoa/07337e16cdfc0815f7c81f192231133d to your computer and use it in GitHub Desktop.
Save bonnefoa/07337e16cdfc0815f7c81f192231133d to your computer and use it in GitHub Desktop.
pg-tracing-forwarder example
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
module github.com/DataDog/pg-tracing-forwarder
go 1.22.1
require (
github.com/jackc/pgx/v5 v5.5.5
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
google.golang.org/grpc v1.62.1
)
require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0/go.mod h1:CQNu9bj7o7mC6U7+CA/schKEYakYXWr79ucDHTMGhCM=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI=
go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 h1:RFiFrvy37/mpSpdySBDrUdipW/dHwsRwh3J3+A9VgT4=
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.mod h1:Z5Iiy3jtmioajWHDGFk7CeugTyHtPvMHA4UTmUkyalE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"github.com/jackc/pgx/v5"
"go.opentelemetry.io/otel"
)
func main() {
var dbConnString string
var otelTarget string
var serviceName string
var tracerName string
flag.StringVar(&dbConnString, "db-url", "", "Database Connection string")
flag.StringVar(&otelTarget, "otel-target", "localhost:4317", "Target of otel collector")
flag.StringVar(&serviceName, "service-name", "PostgreSQL", "Service Name for the generated spans")
flag.StringVar(&tracerName, "tracer-name", "pgtracing-tracer", "Name of the tracer")
flag.Parse()
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
fixedIdGenerator := &FixedIdGenerator{}
tracerShutdown, err := initTracerProvider(ctx, otelTarget, serviceName, fixedIdGenerator)
if err != nil {
log.Fatalf("Error when initialising otel provider: %v", err)
}
defer func() {
if err := tracerShutdown(ctx); err != nil {
log.Fatal("failed to shutdown TracerProvider: %w", err)
}
}()
conn, err := pgx.Connect(ctx, dbConnString)
if err != nil {
log.Fatalf("Error when connecting to PostgreSQL: %v", err)
}
defer conn.Close(ctx)
pgTracingSpans, err := fetchSpans(ctx, conn)
if err != nil {
log.Fatal(err)
}
tracer := otel.Tracer(tracerName)
for _, span := range pgTracingSpans {
forwardSpan(span, tracer, fixedIdGenerator)
}
log.Printf("Done!")
}
package main
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type FixedIdGenerator struct {
FixedSpanID trace.SpanID
FixedTraceID trace.TraceID
}
func (f *FixedIdGenerator) NewSpanID(ctx context.Context, traceID trace.TraceID) trace.SpanID {
return f.FixedSpanID
}
func (f *FixedIdGenerator) NewIDs(ctx context.Context) (trace.TraceID, trace.SpanID) {
return f.FixedTraceID, f.FixedSpanID
}
func initTracerProvider(ctx context.Context, otelTarget string, serviceName string, fixedIdGenerator *FixedIdGenerator) (func(context.Context) error, error) {
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName(serviceName),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, otelTarget,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}
// Set up the trace exporter
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
// Register the trace exporter with a TracerProvider, using a batch
// span processor to aggregate spans before export.
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
sdktrace.WithIDGenerator(fixedIdGenerator),
)
otel.SetTracerProvider(tracerProvider)
otel.SetTextMapPropagator(propagation.TraceContext{})
return tracerProvider.Shutdown, nil
}
package main
import (
"context"
"database/sql"
"fmt"
"log"
"strings"
"time"
"github.com/jackc/pgx/v5"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type PgTracingSpan struct {
TraceId string `db:"trace_id"`
ParentId string `db:"parent_id"`
SpanId string `db:"span_id"`
QueryId int64 `db:"query_id"`
SpanType string `db:"span_type"`
SpanOperation string `db:"span_operation"`
Parameters sql.NullString `db:"parameters"`
SpanStart time.Time `db:"span_start"`
SpanEnd time.Time `db:"span_end"`
Pid int32 `db:"pid"`
SubxactCount int32 `db:"subxact_count"`
SqlErrorCode string `db:"sql_error_code"`
Rows sql.NullInt64 `db:"rows"`
PlanStartupCost sql.NullFloat64 `db:"plan_startup_cost"`
PlanTotalCost sql.NullFloat64 `db:"plan_total_cost"`
PlanRows sql.NullFloat64 `db:"plan_rows"`
PlanWidth sql.NullInt64 `db:"plan_width"`
SharedBlksHit sql.NullInt64 `db:"shared_blks_hit"`
SharedBlksRead sql.NullInt64 `db:"shared_blks_read"`
SharedBlksWritten sql.NullInt64 `db:"shared_blks_written"`
SharedBlksDirtied sql.NullInt64 `db:"shared_blks_dirtied"`
LocalBlksHit sql.NullInt64 `db:"local_blks_hit"`
LocalBlksRead sql.NullInt64 `db:"local_blks_read"`
LocalBlksWritten sql.NullInt64 `db:"local_blks_written"`
LocalBlksDirtied sql.NullInt64 `db:"local_blks_dirtied"`
BlkReadTime sql.NullFloat64 `db:"blk_read_time"`
BlkWriteTime sql.NullFloat64 `db:"blk_write_time"`
TempBlksRead sql.NullInt64 `db:"temp_blks_read"`
TempBlksWritten sql.NullInt64 `db:"temp_blks_written"`
TempBlksReadTime sql.NullFloat64 `db:"temp_blk_read_time"`
TempBlksWriteTime sql.NullFloat64 `db:"temp_blk_write_time"`
WalRecords sql.NullInt64 `db:"wal_records"`
WalFpi sql.NullInt64 `db:"wal_fpi"`
WalBytes sql.NullInt64 `db:"wal_bytes"`
JitFunctions sql.NullInt64 `db:"jit_functions"`
JitGenerationTime sql.NullFloat64 `db:"jit_generation_time"`
JitInliningTime sql.NullFloat64 `db:"jit_inlining_time"`
JitOptimizationTime sql.NullFloat64 `db:"jit_optimization_time"`
JitEmissionTime sql.NullFloat64 `db:"jit_emission_time"`
}
func setMetricIfValueFloat(attributes []attribute.KeyValue, key string, value sql.NullFloat64) []attribute.KeyValue {
if !value.Valid || value.Float64 == 0 {
return attributes
}
return append(attributes, attribute.Float64(key, value.Float64))
}
func setMetricIfValue(attributes []attribute.KeyValue, key string, value sql.NullInt64) []attribute.KeyValue {
if !value.Valid || value.Int64 == 0 {
return attributes
}
return append(attributes, attribute.Int64(key, value.Int64))
}
func fetchSpans(ctx context.Context, conn *pgx.Conn) ([]PgTracingSpan, error) {
query := `select
trace_id, parent_id, span_id, query_id,
span_type, span_operation, parameters,
span_start, span_end,
pid, subxact_count, sql_error_code, rows,
plan_startup_cost, plan_total_cost, plan_rows, plan_width,
shared_blks_hit, shared_blks_read, shared_blks_dirtied, shared_blks_written,
local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written,
blk_read_time, blk_write_time,
temp_blks_read, temp_blks_written, temp_blk_read_time, temp_blk_write_time,
wal_records, wal_fpi, wal_bytes,
jit_functions, jit_generation_time, jit_inlining_time, jit_optimization_time, jit_emission_time
from pg_tracing_consume_spans order by span_start;`
rows, err := conn.Query(ctx, query)
if err != nil {
return nil, err
}
spans, err := pgx.CollectRows(rows, pgx.RowToStructByName[PgTracingSpan])
if err != nil {
return nil, fmt.Errorf("Error fetching pg_tracing spans %w", err)
}
return spans, nil
}
func generate_meta_parameters(meta map[string]string, parameterString string) {
parameters := strings.Split(parameterString, ", ")
for _, param := range parameters {
paramSlice := strings.SplitN(param, "=", 2)
meta[fmt.Sprintf("parameters.%s", paramSlice[0])] = paramSlice[1]
}
}
func forwardSpan(span PgTracingSpan, tracer trace.Tracer, fixedIdGenerator *FixedIdGenerator) error {
log.Printf("traceId: %s, parentId: %s, spanId: %s, span_operation: %s, start: %s, end: %s",
span.TraceId, span.ParentId, span.SpanId, span.SpanOperation, span.SpanStart, span.SpanEnd)
traceId, err := trace.TraceIDFromHex(span.TraceId)
if err != nil {
return fmt.Errorf("Failed parsing traceId %w", err)
}
parentId, err := trace.SpanIDFromHex(span.ParentId)
if err != nil {
return fmt.Errorf("Failed parsing parentId %w", err)
}
spanId, err := trace.SpanIDFromHex(span.SpanId)
if err != nil {
return fmt.Errorf("Failed parsing spanId %w", err)
}
attributes := make([]attribute.KeyValue, 0)
setMetricIfValue(attributes, "rows", span.Rows)
attributes = append(attributes, attribute.Int("pid", int(span.Pid)))
attributes = append(attributes, attribute.Int("subxact_count", int(span.SubxactCount)))
attributes = setMetricIfValue(attributes, "block.shared.hit", span.SharedBlksHit)
attributes = setMetricIfValue(attributes, "block.shared.read", span.SharedBlksRead)
attributes = setMetricIfValue(attributes, "block.shared.dirtied", span.SharedBlksDirtied)
attributes = setMetricIfValue(attributes, "block.shared.written", span.SharedBlksWritten)
attributes = setMetricIfValue(attributes, "block.local.hit", span.LocalBlksHit)
attributes = setMetricIfValue(attributes, "block.local.read", span.LocalBlksRead)
attributes = setMetricIfValue(attributes, "block.local.dirtied", span.LocalBlksDirtied)
attributes = setMetricIfValue(attributes, "block.local.written", span.LocalBlksWritten)
attributes = setMetricIfValueFloat(attributes, "block.read_time", span.BlkReadTime)
attributes = setMetricIfValueFloat(attributes, "block.write_time", span.BlkWriteTime)
attributes = setMetricIfValue(attributes, "block.temp.read", span.TempBlksRead)
attributes = setMetricIfValue(attributes, "block.temp.written", span.TempBlksWritten)
attributes = setMetricIfValueFloat(attributes, "block.temp.read_time", span.TempBlksReadTime)
attributes = setMetricIfValueFloat(attributes, "block.temp.write_time", span.TempBlksWriteTime)
attributes = setMetricIfValue(attributes, "wal.records", span.WalRecords)
attributes = setMetricIfValue(attributes, "wal.fpi", span.WalFpi)
attributes = setMetricIfValue(attributes, "wal.bytes", span.WalBytes)
attributes = setMetricIfValueFloat(attributes, "plan.startup_cost", span.PlanStartupCost)
attributes = setMetricIfValueFloat(attributes, "plan.total_cost", span.PlanTotalCost)
attributes = setMetricIfValueFloat(attributes, "plan.rows", span.PlanRows)
attributes = setMetricIfValue(attributes, "plan.width", span.PlanWidth)
attributes = setMetricIfValue(attributes, "jit.functions", span.JitFunctions)
attributes = setMetricIfValueFloat(attributes, "jit.generation_time", span.JitGenerationTime)
attributes = setMetricIfValueFloat(attributes, "jit.inlining_time", span.JitInliningTime)
attributes = setMetricIfValueFloat(attributes, "jit.optimization_time", span.JitOptimizationTime)
attributes = setMetricIfValueFloat(attributes, "jit.emission_time", span.JitEmissionTime)
if span.SqlErrorCode != "00000" {
attributes = append(attributes, attribute.String("error.msg", "Query error"))
attributes = append(attributes, attribute.String("error.msg", span.SqlErrorCode))
}
if span.Parameters.Valid {
// We're expecting something like
// $1 = '1', $2 = '2'
parameters := strings.Split(span.Parameters.String, ", ")
for _, param := range parameters {
paramSlice := strings.SplitN(param, "=", 2)
attributes = append(attributes, attribute.String(fmt.Sprintf("parameters.%s", paramSlice[0]), paramSlice[1]))
}
}
// Build the parent span context
psc := trace.SpanContext{}
psc = psc.WithTraceID(traceId)
psc = psc.WithSpanID(parentId)
ctx := trace.ContextWithSpanContext(context.Background(), psc)
// Modify the fixed spanID generator before starting the span
fixedIdGenerator.FixedSpanID = spanId
startOptions := []trace.SpanStartOption{
trace.WithTimestamp(span.SpanStart),
trace.WithAttributes(attributes...),
trace.WithSpanKind(trace.SpanKindServer),
}
_, otelSpan := tracer.Start(ctx, span.SpanOperation, startOptions...)
// Set the end span
endOptions := []trace.SpanEndOption{
trace.WithTimestamp(span.SpanEnd),
}
otelSpan.End(endOptions...)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment