Skip to content

Instantly share code, notes, and snippets.

@ksurent
Last active August 21, 2018 10:24
Show Gist options
  • Save ksurent/ced99c8fd62a75fe329fbd315d819f9b to your computer and use it in GitHub Desktop.
Save ksurent/ced99c8fd62a75fe329fbd315d819f9b to your computer and use it in GitHub Desktop.
A simple simulation to see how client-go's workqueue behaves in presence of stuck consumers
package main
import (
"log"
"math/rand"
"net/http"
"os"
"os/signal"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
krand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
func main() {
mp := &metricsProvider{}
workqueue.SetProvider(mp)
wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go producer(wq)
threadiness := 2
for i := 0; i < threadiness; i++ {
go consumer(i, wq)
}
reg := prometheus.NewRegistry()
reg.MustRegister(mp.Collectors()...)
go func() {
log.Println("starting metrics")
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
log.Fatal(http.ListenAndServe(":8889", nil))
}()
<-sigCh
wq.ShutDown()
log.Println("terminating")
}
func consumer(id int, wq workqueue.RateLimitingInterface) {
log.Println("starting consumer", id)
for {
it, stop := wq.Get()
if stop {
break
}
var j time.Duration
if rand.Int31n(100) >= 99 {
j = 100 * time.Second
} else {
j = wait.Jitter(10*time.Millisecond, 20)
}
log.Println("consuming", id, "in", j)
time.Sleep(j)
wq.Forget(it)
wq.Done(it)
}
}
func producer(wq workqueue.RateLimitingInterface) {
log.Println("starting producer")
for {
j := wait.Jitter(1*time.Second, 2)
log.Println("producing in", j)
time.Sleep(j)
wq.Add(krand.String(8))
}
}
type metricsProvider struct {
collectors []prometheus.Collector
}
func (mp *metricsProvider) Collectors() []prometheus.Collector {
return mp.collectors
}
func (mp *metricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
g := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "depth",
Help: "queue depth",
})
mp.collectors = append(mp.collectors, g)
return g
}
func (mp *metricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
c := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "adds",
Help: "unique items added",
})
mp.collectors = append(mp.collectors, c)
return c
}
func (mp *metricsProvider) NewLatencyMetric(name string) workqueue.SummaryMetric {
s := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "latency_microseconds",
Help: "how long items wait to be picked up",
})
mp.collectors = append(mp.collectors, s)
return s
}
func (mp *metricsProvider) NewWorkDurationMetric(name string) workqueue.SummaryMetric {
s := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "duration_microseconds",
Help: "how long items take to process",
})
mp.collectors = append(mp.collectors, s)
return s
}
func (mp *metricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
c := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "retries",
Help: "???",
})
mp.collectors = append(mp.collectors, c)
return c
}
@ksurent
Copy link
Author

ksurent commented Aug 21, 2018

screenshot 2018-08-21 12 12 20

screenshot 2018-08-21 12 12 41

screenshot 2018-08-21 12 13 01

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment