Skip to content

Instantly share code, notes, and snippets.

@srikanthccv
Created February 10, 2024 12:22
Show Gist options
  • Save srikanthccv/882027fe4229cef8fa403f28fe8babd0 to your computer and use it in GitHub Desktop.
Save srikanthccv/882027fe4229cef8fa403f28fe8babd0 to your computer and use it in GitHub Desktop.
Script to compare delta and cumulative
package main
import (
"context"
"fmt"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
)
const tracesPxx = `SELECT
serviceName,
toStartOfInterval(timestamp, toIntervalSecond(60)) AS ts,
quantile(%.2f)(durationNano) / 1000000. AS value
FROM signoz_traces.distributed_signoz_index_v2
WHERE ((timestamp >= '1706788020000000000') AND (timestamp <= '1706789820000000000'))
GROUP BY
serviceName,
ts
ORDER BY
serviceName ASC,
ts ASC
`
const metricsPxxCumulative = `SELECT
service_name,
ts,
histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.2f) AS value
FROM
(
SELECT
service_name,
le,
ts,
sum(rate_value) AS value
FROM
(
SELECT
service_name,
le,
ts,
If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) AS rate_value
FROM
(
SELECT
fingerprint,
service_name,
le,
toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), toIntervalSecond(60)) AS ts,
max(value) AS value
FROM signoz_metrics.distributed_samples_v2
INNER JOIN
(
SELECT
JSONExtractString(labels, 'service_name') AS service_name,
JSONExtractString(labels, 'le') AS le,
fingerprint
FROM signoz_metrics.time_series_v2
WHERE (metric_name = 'signoz_latency_bucket') AND (temporality IN ['Cumulative', 'Unspecified']) AND (JSONExtractString(labels, 'deployment_environment') = 'prod')
) AS filtered_time_series USING (fingerprint)
WHERE (metric_name = 'signoz_latency_bucket') AND (timestamp_ms >= 1706788020000) AND (timestamp_ms < 1706789820000)
GROUP BY
fingerprint,
service_name,
le,
ts
ORDER BY
fingerprint ASC,
service_name ASC,
le ASC,
ts ASC
)
WINDOW rate_window AS (PARTITION BY fingerprint, service_name, le ORDER BY fingerprint ASC, service_name ASC, le ASC, ts ASC)
)
WHERE isNaN(rate_value) = 0
GROUP BY
GROUPING SETS (
(service_name, le, ts),
(service_name, le))
ORDER BY
service_name ASC,
le ASC,
ts ASC
)
GROUP BY
service_name,
ts
ORDER BY
service_name ASC,
ts ASC
`
const metricsPxxDelta = `SELECT
service_name,
ts,
histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.2f) AS value
FROM
(
SELECT
service_name,
le,
toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), toIntervalSecond(60)) AS ts,
sum(value)/60 AS value
FROM signoz_metrics.distributed_samples_v2
INNER JOIN
(
SELECT DISTINCT
JSONExtractString(labels, 'service_name') AS service_name,
JSONExtractString(labels, 'le') AS le,
fingerprint
FROM signoz_metrics.time_series_v2
WHERE (metric_name = 'signoz_latency_bucket') AND (temporality IN ['Delta']) AND (JSONExtractString(labels, 'deployment_environment') = 'prod')
) AS filtered_time_series USING (fingerprint)
WHERE (metric_name = 'signoz_latency_bucket') AND (timestamp_ms >= 1706788020000) AND (timestamp_ms < 1706789820000)
GROUP BY
service_name,
le,
ts
ORDER BY
service_name ASC,
le,
ts ASC
)
GROUP BY
service_name,
ts
ORDER BY
service_name ASC,
ts ASC
`
func main() {
conn, err := clickhouse.Open(&clickhouse.Options{Addr: []string{"localhost:9000"}})
if err != nil {
panic(err)
}
tracesLatencyByServiceName := make(map[string]map[time.Time]float64)
metricsCumulativeLatencyByServiceName := make(map[string]map[time.Time]float64)
metricsDeltaLatencyByServiceName := make(map[string]map[time.Time]float64)
for _, quantile := range []float64{0.99} {
// traces
query := fmt.Sprintf(tracesPxx, quantile)
rows, err := conn.Query(context.Background(), query)
if err != nil {
panic(err)
}
for rows.Next() {
var (
serviceName string
ts time.Time
value float64
)
if err := rows.Scan(&serviceName, &ts, &value); err != nil {
panic(err)
}
if _, ok := tracesLatencyByServiceName[serviceName]; !ok {
tracesLatencyByServiceName[serviceName] = make(map[time.Time]float64)
}
tracesLatencyByServiceName[serviceName][ts] = value
}
rows.Close()
if err := rows.Err(); err != nil {
panic(err)
}
// cumulative
metricsCumulativeQuery := fmt.Sprintf(metricsPxxCumulative, quantile)
rows, err = conn.Query(context.Background(), metricsCumulativeQuery)
if err != nil {
panic(err)
}
for rows.Next() {
var (
serviceName string
ts time.Time
value float64
)
if err := rows.Scan(&serviceName, &ts, &value); err != nil {
panic(err)
}
if _, ok := metricsCumulativeLatencyByServiceName[serviceName]; !ok {
metricsCumulativeLatencyByServiceName[serviceName] = make(map[time.Time]float64)
}
metricsCumulativeLatencyByServiceName[serviceName][ts] = value
}
rows.Close()
if err := rows.Err(); err != nil {
panic(err)
}
// delta
metricsDeltaQuery := fmt.Sprintf(metricsPxxDelta, quantile)
rows, err = conn.Query(context.Background(), metricsDeltaQuery)
if err != nil {
panic(err)
}
for rows.Next() {
var (
serviceName string
ts time.Time
value float64
)
if err := rows.Scan(&serviceName, &ts, &value); err != nil {
panic(err)
}
if _, ok := metricsDeltaLatencyByServiceName[serviceName]; !ok {
metricsDeltaLatencyByServiceName[serviceName] = make(map[time.Time]float64)
}
metricsDeltaLatencyByServiceName[serviceName][ts] = value
}
rows.Close()
if err := rows.Err(); err != nil {
panic(err)
}
}
for serviceName := range metricsCumulativeLatencyByServiceName {
for ts, value := range metricsCumulativeLatencyByServiceName[serviceName] {
deltaValue := metricsDeltaLatencyByServiceName[serviceName][ts]
tracesValue := tracesLatencyByServiceName[serviceName][ts]
fmt.Printf("%s || %v || %f || %f || %f\n", serviceName, ts, value, deltaValue, tracesValue)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment