Last active
March 21, 2024 09:11
-
-
Save bonnefoa/07337e16cdfc0815f7c81f192231133d to your computer and use it in GitHub Desktop.
pg-tracing-forwarder example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# If you prefer the allow list template instead of the deny list, see community template: | |
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore | |
# | |
# Binaries for programs and plugins | |
*.exe | |
*.exe~ | |
*.dll | |
*.so | |
*.dylib | |
# Test binary, built with `go test -c` | |
*.test | |
# Output of the go coverage tool, specifically when used with LiteIDE | |
*.out | |
# Dependency directories (remove the comment below to include it) | |
# vendor/ | |
# Go workspace file | |
go.work |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module github.com/DataDog/pg-tracing-forwarder | |
go 1.22.1 | |
require ( | |
github.com/jackc/pgx/v5 v5.5.5 | |
go.opentelemetry.io/otel v1.24.0 | |
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 | |
go.opentelemetry.io/otel/sdk v1.24.0 | |
go.opentelemetry.io/otel/trace v1.24.0 | |
google.golang.org/grpc v1.62.1 | |
) | |
require ( | |
github.com/cenkalti/backoff/v4 v4.2.1 // indirect | |
github.com/go-logr/logr v1.4.1 // indirect | |
github.com/go-logr/stdr v1.2.2 // indirect | |
github.com/golang/protobuf v1.5.4 // indirect | |
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect | |
github.com/jackc/pgpassfile v1.0.0 // indirect | |
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect | |
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect | |
go.opentelemetry.io/otel/metric v1.24.0 // indirect | |
go.opentelemetry.io/proto/otlp v1.1.0 // indirect | |
golang.org/x/crypto v0.21.0 // indirect | |
golang.org/x/net v0.22.0 // indirect | |
golang.org/x/sys v0.18.0 // indirect | |
golang.org/x/text v0.14.0 // indirect | |
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect | |
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect | |
google.golang.org/protobuf v1.33.0 // indirect | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= | |
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= | |
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | |
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | |
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | |
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= | |
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= | |
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= | |
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= | |
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= | |
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= | |
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= | |
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= | |
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= | |
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= | |
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= | |
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= | |
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= | |
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= | |
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= | |
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= | |
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= | |
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= | |
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= | |
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | |
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | |
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | |
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= | |
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | |
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= | |
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= | |
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= | |
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= | |
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= | |
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= | |
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE= | |
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0/go.mod h1:CQNu9bj7o7mC6U7+CA/schKEYakYXWr79ucDHTMGhCM= | |
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= | |
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= | |
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= | |
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= | |
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= | |
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= | |
go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= | |
go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= | |
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= | |
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= | |
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= | |
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= | |
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= | |
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= | |
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= | |
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= | |
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= | |
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | |
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= | |
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= | |
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 h1:RFiFrvy37/mpSpdySBDrUdipW/dHwsRwh3J3+A9VgT4= | |
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.mod h1:Z5Iiy3jtmioajWHDGFk7CeugTyHtPvMHA4UTmUkyalE= | |
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= | |
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= | |
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= | |
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= | |
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= | |
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= | |
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | |
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | |
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | |
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"log" | |
"os" | |
"os/signal" | |
"github.com/jackc/pgx/v5" | |
"go.opentelemetry.io/otel" | |
) | |
func main() { | |
var dbConnString string | |
var otelTarget string | |
var serviceName string | |
var tracerName string | |
flag.StringVar(&dbConnString, "db-url", "", "Database Connection string") | |
flag.StringVar(&otelTarget, "otel-target", "localhost:4317", "Target of otel collector") | |
flag.StringVar(&serviceName, "service-name", "PostgreSQL", "Service Name for the generated spans") | |
flag.StringVar(&tracerName, "tracer-name", "pgtracing-tracer", "Name of the tracer") | |
flag.Parse() | |
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) | |
defer cancel() | |
fixedIdGenerator := &FixedIdGenerator{} | |
tracerShutdown, err := initTracerProvider(ctx, otelTarget, serviceName, fixedIdGenerator) | |
if err != nil { | |
log.Fatalf("Error when initialising otel provider: %v", err) | |
} | |
defer func() { | |
if err := tracerShutdown(ctx); err != nil { | |
log.Fatal("failed to shutdown TracerProvider: %w", err) | |
} | |
}() | |
conn, err := pgx.Connect(ctx, dbConnString) | |
if err != nil { | |
log.Fatalf("Error when connecting to PostgreSQL: %v", err) | |
} | |
defer conn.Close(ctx) | |
pgTracingSpans, err := fetchSpans(ctx, conn) | |
if err != nil { | |
log.Fatal(err) | |
} | |
tracer := otel.Tracer(tracerName) | |
for _, span := range pgTracingSpans { | |
forwardSpan(span, tracer, fixedIdGenerator) | |
} | |
log.Printf("Done!") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"time" | |
"go.opentelemetry.io/otel" | |
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" | |
"go.opentelemetry.io/otel/propagation" | |
"go.opentelemetry.io/otel/sdk/resource" | |
sdktrace "go.opentelemetry.io/otel/sdk/trace" | |
semconv "go.opentelemetry.io/otel/semconv/v1.17.0" | |
"go.opentelemetry.io/otel/trace" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/credentials/insecure" | |
) | |
type FixedIdGenerator struct { | |
FixedSpanID trace.SpanID | |
FixedTraceID trace.TraceID | |
} | |
func (f *FixedIdGenerator) NewSpanID(ctx context.Context, traceID trace.TraceID) trace.SpanID { | |
return f.FixedSpanID | |
} | |
func (f *FixedIdGenerator) NewIDs(ctx context.Context) (trace.TraceID, trace.SpanID) { | |
return f.FixedTraceID, f.FixedSpanID | |
} | |
func initTracerProvider(ctx context.Context, otelTarget string, serviceName string, fixedIdGenerator *FixedIdGenerator) (func(context.Context) error, error) { | |
res, err := resource.New(ctx, | |
resource.WithAttributes( | |
semconv.ServiceName(serviceName), | |
), | |
) | |
if err != nil { | |
return nil, fmt.Errorf("failed to create resource: %w", err) | |
} | |
ctx, cancel := context.WithTimeout(ctx, time.Second) | |
defer cancel() | |
conn, err := grpc.DialContext(ctx, otelTarget, | |
grpc.WithTransportCredentials(insecure.NewCredentials()), | |
grpc.WithBlock(), | |
) | |
if err != nil { | |
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) | |
} | |
// Set up the trace exporter | |
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) | |
if err != nil { | |
return nil, fmt.Errorf("failed to create trace exporter: %w", err) | |
} | |
// Register the trace exporter with a TracerProvider, using a batch | |
// span processor to aggregate spans before export. | |
bsp := sdktrace.NewBatchSpanProcessor(traceExporter) | |
tracerProvider := sdktrace.NewTracerProvider( | |
sdktrace.WithSampler(sdktrace.AlwaysSample()), | |
sdktrace.WithResource(res), | |
sdktrace.WithSpanProcessor(bsp), | |
sdktrace.WithIDGenerator(fixedIdGenerator), | |
) | |
otel.SetTracerProvider(tracerProvider) | |
otel.SetTextMapPropagator(propagation.TraceContext{}) | |
return tracerProvider.Shutdown, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"database/sql" | |
"fmt" | |
"log" | |
"strings" | |
"time" | |
"github.com/jackc/pgx/v5" | |
"go.opentelemetry.io/otel/attribute" | |
"go.opentelemetry.io/otel/trace" | |
) | |
type PgTracingSpan struct { | |
TraceId string `db:"trace_id"` | |
ParentId string `db:"parent_id"` | |
SpanId string `db:"span_id"` | |
QueryId int64 `db:"query_id"` | |
SpanType string `db:"span_type"` | |
SpanOperation string `db:"span_operation"` | |
Parameters sql.NullString `db:"parameters"` | |
SpanStart time.Time `db:"span_start"` | |
SpanEnd time.Time `db:"span_end"` | |
Pid int32 `db:"pid"` | |
SubxactCount int32 `db:"subxact_count"` | |
SqlErrorCode string `db:"sql_error_code"` | |
Rows sql.NullInt64 `db:"rows"` | |
PlanStartupCost sql.NullFloat64 `db:"plan_startup_cost"` | |
PlanTotalCost sql.NullFloat64 `db:"plan_total_cost"` | |
PlanRows sql.NullFloat64 `db:"plan_rows"` | |
PlanWidth sql.NullInt64 `db:"plan_width"` | |
SharedBlksHit sql.NullInt64 `db:"shared_blks_hit"` | |
SharedBlksRead sql.NullInt64 `db:"shared_blks_read"` | |
SharedBlksWritten sql.NullInt64 `db:"shared_blks_written"` | |
SharedBlksDirtied sql.NullInt64 `db:"shared_blks_dirtied"` | |
LocalBlksHit sql.NullInt64 `db:"local_blks_hit"` | |
LocalBlksRead sql.NullInt64 `db:"local_blks_read"` | |
LocalBlksWritten sql.NullInt64 `db:"local_blks_written"` | |
LocalBlksDirtied sql.NullInt64 `db:"local_blks_dirtied"` | |
BlkReadTime sql.NullFloat64 `db:"blk_read_time"` | |
BlkWriteTime sql.NullFloat64 `db:"blk_write_time"` | |
TempBlksRead sql.NullInt64 `db:"temp_blks_read"` | |
TempBlksWritten sql.NullInt64 `db:"temp_blks_written"` | |
TempBlksReadTime sql.NullFloat64 `db:"temp_blk_read_time"` | |
TempBlksWriteTime sql.NullFloat64 `db:"temp_blk_write_time"` | |
WalRecords sql.NullInt64 `db:"wal_records"` | |
WalFpi sql.NullInt64 `db:"wal_fpi"` | |
WalBytes sql.NullInt64 `db:"wal_bytes"` | |
JitFunctions sql.NullInt64 `db:"jit_functions"` | |
JitGenerationTime sql.NullFloat64 `db:"jit_generation_time"` | |
JitInliningTime sql.NullFloat64 `db:"jit_inlining_time"` | |
JitOptimizationTime sql.NullFloat64 `db:"jit_optimization_time"` | |
JitEmissionTime sql.NullFloat64 `db:"jit_emission_time"` | |
} | |
func setMetricIfValueFloat(attributes []attribute.KeyValue, key string, value sql.NullFloat64) []attribute.KeyValue { | |
if !value.Valid || value.Float64 == 0 { | |
return attributes | |
} | |
return append(attributes, attribute.Float64(key, value.Float64)) | |
} | |
func setMetricIfValue(attributes []attribute.KeyValue, key string, value sql.NullInt64) []attribute.KeyValue { | |
if !value.Valid || value.Int64 == 0 { | |
return attributes | |
} | |
return append(attributes, attribute.Int64(key, value.Int64)) | |
} | |
func fetchSpans(ctx context.Context, conn *pgx.Conn) ([]PgTracingSpan, error) { | |
query := `select | |
trace_id, parent_id, span_id, query_id, | |
span_type, span_operation, parameters, | |
span_start, span_end, | |
pid, subxact_count, sql_error_code, rows, | |
plan_startup_cost, plan_total_cost, plan_rows, plan_width, | |
shared_blks_hit, shared_blks_read, shared_blks_dirtied, shared_blks_written, | |
local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written, | |
blk_read_time, blk_write_time, | |
temp_blks_read, temp_blks_written, temp_blk_read_time, temp_blk_write_time, | |
wal_records, wal_fpi, wal_bytes, | |
jit_functions, jit_generation_time, jit_inlining_time, jit_optimization_time, jit_emission_time | |
from pg_tracing_consume_spans order by span_start;` | |
rows, err := conn.Query(ctx, query) | |
if err != nil { | |
return nil, err | |
} | |
spans, err := pgx.CollectRows(rows, pgx.RowToStructByName[PgTracingSpan]) | |
if err != nil { | |
return nil, fmt.Errorf("Error fetching pg_tracing spans %w", err) | |
} | |
return spans, nil | |
} | |
func generate_meta_parameters(meta map[string]string, parameterString string) { | |
parameters := strings.Split(parameterString, ", ") | |
for _, param := range parameters { | |
paramSlice := strings.SplitN(param, "=", 2) | |
meta[fmt.Sprintf("parameters.%s", paramSlice[0])] = paramSlice[1] | |
} | |
} | |
func forwardSpan(span PgTracingSpan, tracer trace.Tracer, fixedIdGenerator *FixedIdGenerator) error { | |
log.Printf("traceId: %s, parentId: %s, spanId: %s, span_operation: %s, start: %s, end: %s", | |
span.TraceId, span.ParentId, span.SpanId, span.SpanOperation, span.SpanStart, span.SpanEnd) | |
traceId, err := trace.TraceIDFromHex(span.TraceId) | |
if err != nil { | |
return fmt.Errorf("Failed parsing traceId %w", err) | |
} | |
parentId, err := trace.SpanIDFromHex(span.ParentId) | |
if err != nil { | |
return fmt.Errorf("Failed parsing parentId %w", err) | |
} | |
spanId, err := trace.SpanIDFromHex(span.SpanId) | |
if err != nil { | |
return fmt.Errorf("Failed parsing spanId %w", err) | |
} | |
attributes := make([]attribute.KeyValue, 0) | |
setMetricIfValue(attributes, "rows", span.Rows) | |
attributes = append(attributes, attribute.Int("pid", int(span.Pid))) | |
attributes = append(attributes, attribute.Int("subxact_count", int(span.SubxactCount))) | |
attributes = setMetricIfValue(attributes, "block.shared.hit", span.SharedBlksHit) | |
attributes = setMetricIfValue(attributes, "block.shared.read", span.SharedBlksRead) | |
attributes = setMetricIfValue(attributes, "block.shared.dirtied", span.SharedBlksDirtied) | |
attributes = setMetricIfValue(attributes, "block.shared.written", span.SharedBlksWritten) | |
attributes = setMetricIfValue(attributes, "block.local.hit", span.LocalBlksHit) | |
attributes = setMetricIfValue(attributes, "block.local.read", span.LocalBlksRead) | |
attributes = setMetricIfValue(attributes, "block.local.dirtied", span.LocalBlksDirtied) | |
attributes = setMetricIfValue(attributes, "block.local.written", span.LocalBlksWritten) | |
attributes = setMetricIfValueFloat(attributes, "block.read_time", span.BlkReadTime) | |
attributes = setMetricIfValueFloat(attributes, "block.write_time", span.BlkWriteTime) | |
attributes = setMetricIfValue(attributes, "block.temp.read", span.TempBlksRead) | |
attributes = setMetricIfValue(attributes, "block.temp.written", span.TempBlksWritten) | |
attributes = setMetricIfValueFloat(attributes, "block.temp.read_time", span.TempBlksReadTime) | |
attributes = setMetricIfValueFloat(attributes, "block.temp.write_time", span.TempBlksWriteTime) | |
attributes = setMetricIfValue(attributes, "wal.records", span.WalRecords) | |
attributes = setMetricIfValue(attributes, "wal.fpi", span.WalFpi) | |
attributes = setMetricIfValue(attributes, "wal.bytes", span.WalBytes) | |
attributes = setMetricIfValueFloat(attributes, "plan.startup_cost", span.PlanStartupCost) | |
attributes = setMetricIfValueFloat(attributes, "plan.total_cost", span.PlanTotalCost) | |
attributes = setMetricIfValueFloat(attributes, "plan.rows", span.PlanRows) | |
attributes = setMetricIfValue(attributes, "plan.width", span.PlanWidth) | |
attributes = setMetricIfValue(attributes, "jit.functions", span.JitFunctions) | |
attributes = setMetricIfValueFloat(attributes, "jit.generation_time", span.JitGenerationTime) | |
attributes = setMetricIfValueFloat(attributes, "jit.inlining_time", span.JitInliningTime) | |
attributes = setMetricIfValueFloat(attributes, "jit.optimization_time", span.JitOptimizationTime) | |
attributes = setMetricIfValueFloat(attributes, "jit.emission_time", span.JitEmissionTime) | |
if span.SqlErrorCode != "00000" { | |
attributes = append(attributes, attribute.String("error.msg", "Query error")) | |
attributes = append(attributes, attribute.String("error.msg", span.SqlErrorCode)) | |
} | |
if span.Parameters.Valid { | |
// We're expecting something like | |
// $1 = '1', $2 = '2' | |
parameters := strings.Split(span.Parameters.String, ", ") | |
for _, param := range parameters { | |
paramSlice := strings.SplitN(param, "=", 2) | |
attributes = append(attributes, attribute.String(fmt.Sprintf("parameters.%s", paramSlice[0]), paramSlice[1])) | |
} | |
} | |
// Build the parent span context | |
psc := trace.SpanContext{} | |
psc = psc.WithTraceID(traceId) | |
psc = psc.WithSpanID(parentId) | |
ctx := trace.ContextWithSpanContext(context.Background(), psc) | |
// Modify the fixed spanID generator before starting the span | |
fixedIdGenerator.FixedSpanID = spanId | |
startOptions := []trace.SpanStartOption{ | |
trace.WithTimestamp(span.SpanStart), | |
trace.WithAttributes(attributes...), | |
trace.WithSpanKind(trace.SpanKindServer), | |
} | |
_, otelSpan := tracer.Start(ctx, span.SpanOperation, startOptions...) | |
// Set the end span | |
endOptions := []trace.SpanEndOption{ | |
trace.WithTimestamp(span.SpanEnd), | |
} | |
otelSpan.End(endOptions...) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment