Skip to content

Instantly share code, notes, and snippets.

@bonnefoa

bonnefoa/go.mod Secret

Last active December 6, 2023 07:20
Show Gist options
  • Save bonnefoa/6ed24520bdac026d6a6a6992d308bd50 to your computer and use it in GitHub Desktop.
Save bonnefoa/6ed24520bdac026d6a6a6992d308bd50 to your computer and use it in GitHub Desktop.
pg-tracing-forwarder
module github.com/datadog/pg-tracing-forwarder
go 1.20
require (
github.com/DataDog/datadog-agent/pkg/trace v0.45.0-rc.4
github.com/jackc/pgx/v5 v5.3.1
)
require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/tinylib/msgp v1.1.6 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/text v0.9.0 // indirect
)
github.com/DataDog/datadog-agent/pkg/trace v0.45.0-rc.4 h1:6jOc79Fze9kJMBezhVHI1xUKQHD31NSuNzsHw+k7NwI=
github.com/DataDog/datadog-agent/pkg/trace v0.45.0-rc.4/go.mod h1:X7hN9UT7p45Y5bVN3fs54wQh9iry9oNBYJ+yOV/PbJI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU=
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw=
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
github.com/vmihailenco/msgpack/v4 v4.3.12 h1:07s4sz9IReOgdikxLTKNbBdqDMLsjPKXwvCazn8G65U=
github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vbd1qPqc=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/jackc/pgx/v5"
)
func fatalIf(err error) {
if err != nil {
log.Fatalf("Error: %v", err)
}
}
func setMetricIfValue(metrics map[string]float64, key string, value sql.NullInt64) {
if !value.Valid {
return
}
metrics[key] = float64(value.Int64)
}
func setMetricIfValueFloat(metrics map[string]float64, key string, value sql.NullFloat64) {
if !value.Valid {
return
}
metrics[key] = value.Float64
}
type BlockStats struct {
hit sql.NullInt64
read sql.NullInt64
written sql.NullInt64
dirtied sql.NullInt64
}
type BlockTime struct {
readTime sql.NullFloat64
writeTime sql.NullFloat64
}
func generate_meta_parameters(meta map[string]string, parameterString string) {
parameters := strings.Split(parameterString, ", ")
fmt.Printf("Source parameter: %s\n", parameterString)
for _, param := range parameters {
paramSlice := strings.SplitN(param, "=", 2)
fmt.Printf("Splitting parameter %s\n", paramSlice)
meta[fmt.Sprintf("parameters.%s", paramSlice[0])] = paramSlice[1]
}
}
func fetchSpans(ctx context.Context, conn *pgx.Conn) map[uint64][]pb.Span {
query := `select
trace_id, parent_id, span_id,
span_type, span_operation, deparse_info, parameters,
span_start, duration,
startup,
pid, subxact_count,
sql_error_code,
rows,
plan_startup_cost, plan_total_cost, plan_rows, plan_width,
shared_blks_hit, shared_blks_read, shared_blks_dirtied, shared_blks_written,
local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written,
blk_read_time, blk_write_time,
temp_blks_read, temp_blks_written, temp_blk_read_time, temp_blk_write_time,
wal_records, wal_fpi, wal_bytes,
jit_functions, jit_generation_time, jit_inlining_time, jit_optimization_time, jit_emission_time
from pg_tracing_consume_spans order by span_start;`
fmt.Printf("Query: %s", query)
rows, err := conn.Query(ctx, query)
fatalIf(err)
traceToSpans := make(map[uint64][]pb.Span, 0)
for rows.Next() {
var traceId int64
var parentId int64
var spanId int64
var span_type string
var span_operation string
var deparse_info sql.NullString
var parameters sql.NullString
var span_start time.Time
var duration int64
var startup sql.NullInt64
var pid int32
var subxact_count int32
var sql_error_code string
var rowNumber sql.NullInt64
var planStartupCost sql.NullFloat64
var planTotalCost sql.NullFloat64
var planRows sql.NullFloat64
var planWidth sql.NullInt64
var sharedBlks BlockStats
var localBlks BlockStats
var blkTime BlockTime
var tempBlks BlockStats
var tempBlkTime BlockTime
var wal_records sql.NullInt64
var wal_fpi sql.NullInt64
var wal_bytes sql.NullInt64
var jit_functions sql.NullInt64
var jit_generation_time sql.NullFloat64
var jit_inlining_time sql.NullFloat64
var jit_optimization_time sql.NullFloat64
var jit_emission_time sql.NullFloat64
if err := rows.Scan(&traceId, &parentId, &spanId,
&span_type, &span_operation, &deparse_info, &parameters,
&span_start, &duration, &startup, &pid, &subxact_count, &sql_error_code, &rowNumber,
&planStartupCost, &planTotalCost, &planRows, &planWidth,
&sharedBlks.hit, &sharedBlks.read, &sharedBlks.dirtied, &sharedBlks.written,
&localBlks.hit, &localBlks.read, &localBlks.dirtied, &localBlks.written,
&blkTime.readTime, &blkTime.writeTime,
&tempBlks.read, &tempBlks.written,
&tempBlkTime.readTime, &tempBlkTime.writeTime,
&wal_records, &wal_fpi, &wal_bytes,
&jit_functions, &jit_generation_time, &jit_inlining_time, &jit_optimization_time, &jit_emission_time); err != nil {
log.Fatal(err)
}
log.Printf("traceId: %d, parentId: %d, spanId: %d, span_operation: %s, start: %s, duration: %d", traceId, parentId, spanId, span_operation, span_start, duration)
utrace_id := uint64(traceId)
uparent_id := uint64(parentId)
uspan_id := uint64(spanId)
metrics := make(map[string]float64, 0)
setMetricIfValue(metrics, "rows", rowNumber)
// TODO: Use span events when available
setMetricIfValue(metrics, "first_tuple", startup)
metrics["pid"] = float64(pid)
metrics["subxact_count"] = float64(subxact_count)
setMetricIfValue(metrics, "block.shared.hit", sharedBlks.hit)
setMetricIfValue(metrics, "block.shared.read", sharedBlks.read)
setMetricIfValue(metrics, "block.shared.dirtied", sharedBlks.dirtied)
setMetricIfValue(metrics, "block.shared.written", sharedBlks.written)
setMetricIfValue(metrics, "block.local.hit", localBlks.hit)
setMetricIfValue(metrics, "block.local.read", localBlks.read)
setMetricIfValue(metrics, "block.local.dirtied", localBlks.dirtied)
setMetricIfValue(metrics, "block.local.written", localBlks.written)
setMetricIfValueFloat(metrics, "block.read_time", blkTime.readTime)
setMetricIfValueFloat(metrics, "block.write_time", blkTime.writeTime)
setMetricIfValue(metrics, "block.temp.read", tempBlks.read)
setMetricIfValue(metrics, "block.temp.written", tempBlks.written)
setMetricIfValueFloat(metrics, "block.temp.read_time", tempBlkTime.readTime)
setMetricIfValueFloat(metrics, "block.temp.write_time", tempBlkTime.writeTime)
setMetricIfValue(metrics, "wal.records", wal_records)
setMetricIfValue(metrics, "wal.fpi", wal_fpi)
setMetricIfValue(metrics, "wal.bytes", wal_bytes)
setMetricIfValueFloat(metrics, "plan.startup_cost", planStartupCost)
setMetricIfValueFloat(metrics, "plan.total_cost", planTotalCost)
setMetricIfValueFloat(metrics, "plan.rows", planRows)
setMetricIfValue(metrics, "plan.width", planWidth)
setMetricIfValue(metrics, "jit.functions", jit_functions)
setMetricIfValueFloat(metrics, "jit.generation_time", jit_generation_time)
setMetricIfValueFloat(metrics, "jit.inlining_time", jit_inlining_time)
setMetricIfValueFloat(metrics, "jit.optimization_time", jit_optimization_time)
setMetricIfValueFloat(metrics, "jit.emission_time", jit_emission_time)
meta := make(map[string]string, 0)
meta["span.kind"] = "server"
if parameters.Valid {
// We're expecting something like
// $1 = '1', $2 = '2'
generate_meta_parameters(meta, parameters.String)
}
error := int32(0)
if sql_error_code != "00000" {
error = int32(1)
meta["error.msg"] = "Query error"
meta["error.code"] = sql_error_code
meta["error.stack"] = ""
}
spans, ok := traceToSpans[utrace_id]
if !ok {
spans = make([]pb.Span, 0)
}
resource := span_operation
if deparse_info.Valid {
resource = fmt.Sprintf("%s %s", span_operation, deparse_info.String)
}
service := "Postgresql-server"
span := pb.Span{
Service: service,
Name: span_type,
Resource: resource,
TraceID: utrace_id,
SpanID: uspan_id,
ParentID: uparent_id,
Meta: meta,
Type: "db",
Start: span_start.UnixNano(),
Duration: duration,
Metrics: metrics,
Error: error,
}
log.Printf("Built span %v", span)
spans = append(spans, span)
traceToSpans[utrace_id] = spans
}
return traceToSpans
}
func sendSpans(traceToSpans map[uint64][]pb.Span) {
traces := make([][]pb.Span, 0)
for _, spans := range traceToSpans {
traces = append(traces, spans)
}
jsonBytes, err := json.Marshal(traces)
fatalIf(err)
statsURL := os.Getenv("DATADOG_STATS_URL")
if statsURL == "" {
return
}
client := &http.Client{}
log.Printf("Sending spans %v to %s", string(jsonBytes), statsURL)
err = os.WriteFile("/tmp/spans.json", jsonBytes, 0644)
fatalIf(err)
buf := bytes.NewBuffer(jsonBytes)
req, err := http.NewRequest("PUT", statsURL, buf)
fatalIf(err)
response, err := client.Do(req)
fatalIf(err)
log.Printf("Answer: %v", response)
}
func main() {
notify_channel := flag.String("notify-channel", "", "Channel to listen for pg_tracing notification")
flag.Parse()
// urlExample := "postgres://username:password@localhost:5432/database_name"
ctx := context.Background()
conn, err := pgx.Connect(ctx, os.Getenv("DATABASE_URL"))
fatalIf(err)
defer conn.Close(ctx)
if notify_channel != nil && *notify_channel != "" {
_, err = conn.Exec(ctx, fmt.Sprintf("listen %s", *notify_channel))
fatalIf(err)
for {
notification, err := conn.WaitForNotification(context.Background())
fatalIf(err)
fmt.Println("PID:", notification.PID, "Channel:", notification.Channel, "Payload:", notification.Payload)
traceToSpans := fetchSpans(ctx, conn)
sendSpans(traceToSpans)
}
}
traceToSpans := fetchSpans(ctx, conn)
sendSpans(traceToSpans)
}
DATADOG_STATS_URL=http://localhost:8126/v0.3/traces PGUSER=postgres PGHOST=/tmp ./pg-tracing-forwarder
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment