Skip to content

Instantly share code, notes, and snippets.

@gitirabassi
Last active February 4, 2020 22:15
Show Gist options
  • Save gitirabassi/2abf8aeb2e3d9b39e98009a70028a6d0 to your computer and use it in GitHub Desktop.
Save gitirabassi/2abf8aeb2e3d9b39e98009a70028a6d0 to your computer and use it in GitHub Desktop.
PromQL vs Flux

This query is used to calculate error rate and total requests per service and environment

data = from(bucket: "istio")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "istio_requests_total")
  |> filter(fn: (r) => r._field == "counter")
  |> filter(fn: (r) => r.reporter == "destination" )
  |> filter(fn: (r) => r.request_protocol == "http")
  |> difference(nonNegative: true, columns: ["_value"])
  |> sum()

total_requests = data
  |> group(columns: ["destination_service_name", "env"])
  |> sum()
  |> set(key: "response_code", value: "total")

success_request = data
  |> filter(fn: (r) => r.response_code =~ /^2/ )
  |> set(key: "response_code", value: "2xx")
  |> group(columns: ["response_code", "destination_service_name", "env"])
  |> sum()

redirected = data
  |> filter(fn: (r) => r.response_code =~ /^3/ )
  |> set(key: "response_code", value: "3xx")
  |> group(columns: ["response_code", "destination_service_name", "env"])
  |> sum()

client_error = data
  |> filter(fn: (r) => r.response_code =~ /^4/ )
  |> set(key: "response_code", value: "4xx")
  |> group(columns: ["response_code", "destination_service_name", "env"])
  |> sum()

server_error = data
  |> filter(fn: (r) => r.response_code =~ /^5/ )
  |> set(key: "response_code", value: "5xx")
  |> group(columns: ["response_code", "destination_service_name", "env"])
  |> sum()


union(tables: [total_requests, success_request, redirected, client_error, server_error])
  |> group()
  |> pivot( rowKey:["destination_service_name", "env"], columnKey: ["response_code"],valueColumn: "_value")
  |> map( fn: (r) => {
    return {r with 
      success_rate: float(v: r["2xx"]) / float(v: r.total) * 100.0,
      redirected_rate: float(v: r["3xx"]) / float(v: r.total) * 100.0,
      client_error_rate: float(v: r["4xx"]) / float(v: r.total) * 100.0,
      server_error_rate: float(v: r["5xx"]) / float(v: r.total) * 100.0,      
    }
  })

and in promql

sum(irate(istio_requests_total{reporter=\"destination\",destination_workload_namespace=~\"$namespace\",destination_workload=~\"$workload\",response_code!~\"2.*\"}[5m])) / sum(irate(istio_requests_total{reporter=\"destination\",destination_workload_namespace=~\"$namespace\",destination_workload=~\"$workload\"}[5m]))
sum(irate(istio_requests_total{reporter=\"destination\",destination_workload_namespace=~\"$namespace\",destination_workload=~\"$workload\",response_code!~\"3.*\"}[5m])) / sum(irate(istio_requests_total{reporter=\"destination\",destination_workload_namespace=~\"$namespace\",destination_workload=~\"$workload\"}[5m]))
sum(irate(istio_requests_total{reporter=\"destination\",destination_workload_namespace=~\"$namespace\",destination_workload=~\"$workload\",response_code!~\"4.*\"}[5m])) / sum(irate(istio_requests_total{reporter=\"destination\",destination_workload_namespace=~\"$namespace\",destination_workload=~\"$workload\"}[5m]))
sum(irate(istio_requests_total{reporter=\"destination\",destination_workload_namespace=~\"$namespace\",destination_workload=~\"$workload\",response_code!~\"5.*\"}[5m])) / sum(irate(istio_requests_total{reporter=\"destination\",destination_workload_namespace=~\"$namespace\",destination_workload=~\"$workload\"}[5m]))

This query is used to calculate latency (p50,p90,p95,p99,p9999) per env, service, response code

data = from(bucket:"istio")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "istio_request_duration_seconds")
  |> filter(fn: (r) => r.reporter == "destination")
  |> filter(fn: (r) => r._field != "count" and r._field != "sum")
  |> window(every: duration(v: int(v: v.timeRangeStop) - int(v: v.timeRangeStart)))
  |> sum()
  |> map(fn:(r) => ({r with le: float(v:r._field)}))

p50 = data
    |> set(key: "_field", value: "p50")
    |> histogramQuantile(quantile: 0.50)
    
p90 = data
    |> set(key: "_field", value: "p90")
    |> histogramQuantile(quantile: 0.90)
    
p95 = data
    |> set(key: "_field", value: "p95")
    |> histogramQuantile(quantile: 0.95)
    
p99 = data
    |> set(key: "_field", value: "p99")
    |> histogramQuantile(quantile: 0.99)
    
p9999 = data
    |> set(key: "_field", value: "p9999")
    |> histogramQuantile(quantile: 0.9999)

union(tables: [p50, p90, p95, p99, p9999])
    |> duplicate(column: "_stop", as:"_time")
    |> window(every:inf)
    |> group(columns:[ "_field", "destination_service_name", "response_code", "env"]) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"],valueColumn: "_value")
    |> group()
    |> map(fn:(r) => ({r with response_code: "CODE: " + string(v:r.response_code)})) 

Percentile query in promql (not exact same query, but same data source, all 5 lines to get same result as flux one)

histogram_quantile(0.50, sum(irate(istio_request_bytes_bucket{reporter=\"destination\", connection_security_policy=\"mutual_tls\", destination_workload=~\"$workload\", destination_workload_namespace=~\"$namespace\", source_workload=~\"$srcwl\", source_workload_namespace=~\"$srcns\"}[1m])) by (source_workload, source_workload_namespace, le))
histogram_quantile(0.90, sum(irate(istio_request_bytes_bucket{reporter=\"destination\", connection_security_policy=\"mutual_tls\", destination_workload=~\"$workload\", destination_workload_namespace=~\"$namespace\", source_workload=~\"$srcwl\", source_workload_namespace=~\"$srcns\"}[1m])) by (source_workload, source_workload_namespace, le))
histogram_quantile(0.95, sum(irate(istio_request_bytes_bucket{reporter=\"destination\", connection_security_policy=\"mutual_tls\", destination_workload=~\"$workload\", destination_workload_namespace=~\"$namespace\", source_workload=~\"$srcwl\", source_workload_namespace=~\"$srcns\"}[1m])) by (source_workload, source_workload_namespace, le))
histogram_quantile(0.99, sum(irate(istio_request_bytes_bucket{reporter=\"destination\", connection_security_policy=\"mutual_tls\", destination_workload=~\"$workload\", destination_workload_namespace=~\"$namespace\", source_workload=~\"$srcwl\", source_workload_namespace=~\"$srcns\"}[1m])) by (source_workload, source_workload_namespace, le))
histogram_quantile(0.9999, sum(irate(istio_request_bytes_bucket{reporter=\"destination\", connection_security_policy=\"mutual_tls\", destination_workload=~\"$workload\", destination_workload_namespace=~\"$namespace\", source_workload=~\"$srcwl\", source_workload_namespace=~\"$srcns\"}[1m])) by (source_workload, source_workload_namespace, le))

Query to see latency for a given service in a given env over a period of time

data = from(bucket:"istio")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "istio_request_duration_seconds")
  |> filter(fn: (r) => r.reporter == "destination")
  |> filter(fn: (r) => r.env == v.env)
  |> filter(fn: (r) => r.destination_service_name == v.istio_destination_service_name)
  |> filter(fn: (r) => r._field != "count" and r._field != "sum")
  |> window(every: v.windowPeriod)
  |> sum()
  |> map(fn:(r) => ({r with le: float(v:r._field)}))

p50 = data
  |> set(key: "_field", value: "p50")
  |> histogramQuantile(quantile: 0.50)
    
p90 = data
  |> set(key: "_field", value: "p90")
  |> histogramQuantile(quantile: 0.90)
    
p95 = data
  |> set(key: "_field", value: "p95")
  |> histogramQuantile(quantile: 0.95)
    
p99 = data
  |> set(key: "_field", value: "p99")
  |> histogramQuantile(quantile: 0.99)
    
p9999 = data
  |> set(key: "_field", value: "p9999")
  |> histogramQuantile(quantile: 0.9999)

union(tables: [p50, p90, p95, p99, p9999])
  |> duplicate(column: "_stop", as:"_time")
  |> window(every:inf)
  |> keep(columns: ["_time", "response_code","_field", "_value"])
  |> sort(columns: ["_time"])
  

Query to identify long running queries

threshold = 10^9

from(bucket: "apps")
 |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
 |> filter(fn: (r) =>
   r._measurement == "query_log"
   and r.role =~ /^queryd/
   and r.env == v.env
   and (r._field == "totalDuration" or r._field == "ot_trace_id")
 )
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
 |> filter(fn: (r) => r.totalDuration >= threshold)
 |> group()
 |> map(fn: (r) => {
   trace_link = if r.ot_trace_sampled == "true" then "https://tracing.aws.influxdata.io/trace/${r.ot_trace_id}" else "trace ${r.ot_trace_id} is not sampled"
   env = if r.env == "acc" then "acceptance" else r.env // TODO(gavincabbage): this is a hack
   log_link = "https://logs.aws.influxdata.io/streams/000000000000000000000001/search?rangetype=relative&fields=source%2Cmessage&width=2560&highlightMessage=&relative=86400&q=environment%3A${env}%20AND%20%22ot_trace_id%3D${r.ot_trace_id}%22"
   return {r with trace_link: trace_link, log_link: log_link}
 })
 |> keep(columns: ["_time", "totalDuration", "role", "host", "nodename", "orgID", "trace_link", "log_link", "error", "errorCode", "source"])

Query to get uptime of ArgoCD

import "system"
from(bucket: "apps")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "process_start_time_seconds")
  |> filter(fn: (r) => r._field == "gauge")
  |> filter(fn: (r) => r.role == "argocd")
  |> filter(fn: (r) => r.url == "http://argocd-server-metrics:8083/metrics")
  |> last()
  |> toInt()
  |> map(fn: (r) => ({ r with _value: int(v: now()) - r._value * 1000000000 }))
  |> map(fn: (r) => ({ r with _value: string(v: duration(v: r._value)) }))
  |> keep(columns: ["_value"])

Query from Russ

interval = 1h
exchange = "binance"
coin = "BCH"
data = 
    from(bucket: "candles")
      |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r.exchange == exchange)
      |> filter(fn: (r) => r._measurement == coin)
      |> filter(fn: (r) => r._field == "price")
aggregate = (tables=<-,name, fn) => 
        tables
          |> aggregateWindow(every: interval, fn:fn)
          |> yield(name:name)
data |> aggregate(name:"close",fn: last)
data |> aggregate(name:"open",fn: first)
data |> aggregate(name:"high",fn: max)
data |> aggregate(name:"low",fn: min)

query in influxQL

SELECT
  first(price) AS open,
  last(price) AS close,
  max(price) AS high,
  min(price) AS low,
FROM candles
GROUP BY time(1h)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment