-
-
Save bonnefoa/6ed24520bdac026d6a6a6992d308bd50 to your computer and use it in GitHub Desktop.
pg-tracing-forwarder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module github.com/datadog/pg-tracing-forwarder | |
go 1.20 | |
require ( | |
github.com/DataDog/datadog-agent/pkg/trace v0.45.0-rc.4 | |
github.com/jackc/pgx/v5 v5.3.1 | |
) | |
require ( | |
github.com/gogo/protobuf v1.3.2 // indirect | |
github.com/jackc/pgpassfile v1.0.0 // indirect | |
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect | |
github.com/philhofer/fwd v1.1.1 // indirect | |
github.com/tinylib/msgp v1.1.6 // indirect | |
golang.org/x/crypto v0.7.0 // indirect | |
golang.org/x/net v0.10.0 // indirect | |
golang.org/x/text v0.9.0 // indirect | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
github.com/DataDog/datadog-agent/pkg/trace v0.45.0-rc.4 h1:6jOc79Fze9kJMBezhVHI1xUKQHD31NSuNzsHw+k7NwI= | |
github.com/DataDog/datadog-agent/pkg/trace v0.45.0-rc.4/go.mod h1:X7hN9UT7p45Y5bVN3fs54wQh9iry9oNBYJ+yOV/PbJI= | |
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | |
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | |
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= | |
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= | |
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= | |
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= | |
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= | |
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= | |
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= | |
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= | |
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= | |
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= | |
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= | |
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= | |
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= | |
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= | |
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | |
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | |
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | |
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= | |
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | |
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= | |
github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw= | |
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= | |
github.com/vmihailenco/msgpack/v4 v4.3.12 h1:07s4sz9IReOgdikxLTKNbBdqDMLsjPKXwvCazn8G65U= | |
github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vbd1qPqc= | |
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | |
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | |
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= | |
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= | |
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= | |
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= | |
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= | |
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | |
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | |
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= | |
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= | |
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= | |
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= | |
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= | |
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= | |
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= | |
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | |
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | |
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= | |
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | |
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= | |
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= | |
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | |
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= | |
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= | |
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= | |
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= | |
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= | |
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= | |
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | |
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | |
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"database/sql" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"log" | |
"net/http" | |
"os" | |
"strings" | |
"time" | |
"github.com/DataDog/datadog-agent/pkg/trace/pb" | |
"github.com/jackc/pgx/v5" | |
) | |
func fatalIf(err error) { | |
if err != nil { | |
log.Fatalf("Error: %v", err) | |
} | |
} | |
func setMetricIfValue(metrics map[string]float64, key string, value sql.NullInt64) { | |
if !value.Valid { | |
return | |
} | |
metrics[key] = float64(value.Int64) | |
} | |
func setMetricIfValueFloat(metrics map[string]float64, key string, value sql.NullFloat64) { | |
if !value.Valid { | |
return | |
} | |
metrics[key] = value.Float64 | |
} | |
type BlockStats struct { | |
hit sql.NullInt64 | |
read sql.NullInt64 | |
written sql.NullInt64 | |
dirtied sql.NullInt64 | |
} | |
type BlockTime struct { | |
readTime sql.NullFloat64 | |
writeTime sql.NullFloat64 | |
} | |
func generate_meta_parameters(meta map[string]string, parameterString string) { | |
parameters := strings.Split(parameterString, ", ") | |
fmt.Printf("Source parameter: %s\n", parameterString) | |
for _, param := range parameters { | |
paramSlice := strings.SplitN(param, "=", 2) | |
fmt.Printf("Splitting parameter %s\n", paramSlice) | |
meta[fmt.Sprintf("parameters.%s", paramSlice[0])] = paramSlice[1] | |
} | |
} | |
func fetchSpans(ctx context.Context, conn *pgx.Conn) map[uint64][]pb.Span { | |
query := `select | |
trace_id, parent_id, span_id, | |
span_type, span_operation, deparse_info, parameters, | |
span_start, duration, | |
startup, | |
pid, subxact_count, | |
sql_error_code, | |
rows, | |
plan_startup_cost, plan_total_cost, plan_rows, plan_width, | |
shared_blks_hit, shared_blks_read, shared_blks_dirtied, shared_blks_written, | |
local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written, | |
blk_read_time, blk_write_time, | |
temp_blks_read, temp_blks_written, temp_blk_read_time, temp_blk_write_time, | |
wal_records, wal_fpi, wal_bytes, | |
jit_functions, jit_generation_time, jit_inlining_time, jit_optimization_time, jit_emission_time | |
from pg_tracing_consume_spans order by span_start;` | |
fmt.Printf("Query: %s", query) | |
rows, err := conn.Query(ctx, query) | |
fatalIf(err) | |
traceToSpans := make(map[uint64][]pb.Span, 0) | |
for rows.Next() { | |
var traceId int64 | |
var parentId int64 | |
var spanId int64 | |
var span_type string | |
var span_operation string | |
var deparse_info sql.NullString | |
var parameters sql.NullString | |
var span_start time.Time | |
var duration int64 | |
var startup sql.NullInt64 | |
var pid int32 | |
var subxact_count int32 | |
var sql_error_code string | |
var rowNumber sql.NullInt64 | |
var planStartupCost sql.NullFloat64 | |
var planTotalCost sql.NullFloat64 | |
var planRows sql.NullFloat64 | |
var planWidth sql.NullInt64 | |
var sharedBlks BlockStats | |
var localBlks BlockStats | |
var blkTime BlockTime | |
var tempBlks BlockStats | |
var tempBlkTime BlockTime | |
var wal_records sql.NullInt64 | |
var wal_fpi sql.NullInt64 | |
var wal_bytes sql.NullInt64 | |
var jit_functions sql.NullInt64 | |
var jit_generation_time sql.NullFloat64 | |
var jit_inlining_time sql.NullFloat64 | |
var jit_optimization_time sql.NullFloat64 | |
var jit_emission_time sql.NullFloat64 | |
if err := rows.Scan(&traceId, &parentId, &spanId, | |
&span_type, &span_operation, &deparse_info, ¶meters, | |
&span_start, &duration, &startup, &pid, &subxact_count, &sql_error_code, &rowNumber, | |
&planStartupCost, &planTotalCost, &planRows, &planWidth, | |
&sharedBlks.hit, &sharedBlks.read, &sharedBlks.dirtied, &sharedBlks.written, | |
&localBlks.hit, &localBlks.read, &localBlks.dirtied, &localBlks.written, | |
&blkTime.readTime, &blkTime.writeTime, | |
&tempBlks.read, &tempBlks.written, | |
&tempBlkTime.readTime, &tempBlkTime.writeTime, | |
&wal_records, &wal_fpi, &wal_bytes, | |
&jit_functions, &jit_generation_time, &jit_inlining_time, &jit_optimization_time, &jit_emission_time); err != nil { | |
log.Fatal(err) | |
} | |
log.Printf("traceId: %d, parentId: %d, spanId: %d, span_operation: %s, start: %s, duration: %d", traceId, parentId, spanId, span_operation, span_start, duration) | |
utrace_id := uint64(traceId) | |
uparent_id := uint64(parentId) | |
uspan_id := uint64(spanId) | |
metrics := make(map[string]float64, 0) | |
setMetricIfValue(metrics, "rows", rowNumber) | |
// TODO: Use span events when available | |
setMetricIfValue(metrics, "first_tuple", startup) | |
metrics["pid"] = float64(pid) | |
metrics["subxact_count"] = float64(subxact_count) | |
setMetricIfValue(metrics, "block.shared.hit", sharedBlks.hit) | |
setMetricIfValue(metrics, "block.shared.read", sharedBlks.read) | |
setMetricIfValue(metrics, "block.shared.dirtied", sharedBlks.dirtied) | |
setMetricIfValue(metrics, "block.shared.written", sharedBlks.written) | |
setMetricIfValue(metrics, "block.local.hit", localBlks.hit) | |
setMetricIfValue(metrics, "block.local.read", localBlks.read) | |
setMetricIfValue(metrics, "block.local.dirtied", localBlks.dirtied) | |
setMetricIfValue(metrics, "block.local.written", localBlks.written) | |
setMetricIfValueFloat(metrics, "block.read_time", blkTime.readTime) | |
setMetricIfValueFloat(metrics, "block.write_time", blkTime.writeTime) | |
setMetricIfValue(metrics, "block.temp.read", tempBlks.read) | |
setMetricIfValue(metrics, "block.temp.written", tempBlks.written) | |
setMetricIfValueFloat(metrics, "block.temp.read_time", tempBlkTime.readTime) | |
setMetricIfValueFloat(metrics, "block.temp.write_time", tempBlkTime.writeTime) | |
setMetricIfValue(metrics, "wal.records", wal_records) | |
setMetricIfValue(metrics, "wal.fpi", wal_fpi) | |
setMetricIfValue(metrics, "wal.bytes", wal_bytes) | |
setMetricIfValueFloat(metrics, "plan.startup_cost", planStartupCost) | |
setMetricIfValueFloat(metrics, "plan.total_cost", planTotalCost) | |
setMetricIfValueFloat(metrics, "plan.rows", planRows) | |
setMetricIfValue(metrics, "plan.width", planWidth) | |
setMetricIfValue(metrics, "jit.functions", jit_functions) | |
setMetricIfValueFloat(metrics, "jit.generation_time", jit_generation_time) | |
setMetricIfValueFloat(metrics, "jit.inlining_time", jit_inlining_time) | |
setMetricIfValueFloat(metrics, "jit.optimization_time", jit_optimization_time) | |
setMetricIfValueFloat(metrics, "jit.emission_time", jit_emission_time) | |
meta := make(map[string]string, 0) | |
meta["span.kind"] = "server" | |
if parameters.Valid { | |
// We're expecting something like | |
// $1 = '1', $2 = '2' | |
generate_meta_parameters(meta, parameters.String) | |
} | |
error := int32(0) | |
if sql_error_code != "00000" { | |
error = int32(1) | |
meta["error.msg"] = "Query error" | |
meta["error.code"] = sql_error_code | |
meta["error.stack"] = "" | |
} | |
spans, ok := traceToSpans[utrace_id] | |
if !ok { | |
spans = make([]pb.Span, 0) | |
} | |
resource := span_operation | |
if deparse_info.Valid { | |
resource = fmt.Sprintf("%s %s", span_operation, deparse_info.String) | |
} | |
service := "Postgresql-server" | |
span := pb.Span{ | |
Service: service, | |
Name: span_type, | |
Resource: resource, | |
TraceID: utrace_id, | |
SpanID: uspan_id, | |
ParentID: uparent_id, | |
Meta: meta, | |
Type: "db", | |
Start: span_start.UnixNano(), | |
Duration: duration, | |
Metrics: metrics, | |
Error: error, | |
} | |
log.Printf("Built span %v", span) | |
spans = append(spans, span) | |
traceToSpans[utrace_id] = spans | |
} | |
return traceToSpans | |
} | |
func sendSpans(traceToSpans map[uint64][]pb.Span) { | |
traces := make([][]pb.Span, 0) | |
for _, spans := range traceToSpans { | |
traces = append(traces, spans) | |
} | |
jsonBytes, err := json.Marshal(traces) | |
fatalIf(err) | |
statsURL := os.Getenv("DATADOG_STATS_URL") | |
if statsURL == "" { | |
return | |
} | |
client := &http.Client{} | |
log.Printf("Sending spans %v to %s", string(jsonBytes), statsURL) | |
err = os.WriteFile("/tmp/spans.json", jsonBytes, 0644) | |
fatalIf(err) | |
buf := bytes.NewBuffer(jsonBytes) | |
req, err := http.NewRequest("PUT", statsURL, buf) | |
fatalIf(err) | |
response, err := client.Do(req) | |
fatalIf(err) | |
log.Printf("Answer: %v", response) | |
} | |
func main() { | |
notify_channel := flag.String("notify-channel", "", "Channel to listen for pg_tracing notification") | |
flag.Parse() | |
// urlExample := "postgres://username:password@localhost:5432/database_name" | |
ctx := context.Background() | |
conn, err := pgx.Connect(ctx, os.Getenv("DATABASE_URL")) | |
fatalIf(err) | |
defer conn.Close(ctx) | |
if notify_channel != nil && *notify_channel != "" { | |
_, err = conn.Exec(ctx, fmt.Sprintf("listen %s", *notify_channel)) | |
fatalIf(err) | |
for { | |
notification, err := conn.WaitForNotification(context.Background()) | |
fatalIf(err) | |
fmt.Println("PID:", notification.PID, "Channel:", notification.Channel, "Payload:", notification.Payload) | |
traceToSpans := fetchSpans(ctx, conn) | |
sendSpans(traceToSpans) | |
} | |
} | |
traceToSpans := fetchSpans(ctx, conn) | |
sendSpans(traceToSpans) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DATADOG_STATS_URL=http://localhost:8126/v0.3/traces PGUSER=postgres PGHOST=/tmp ./pg-tracing-forwarder |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment