Skip to content

Instantly share code, notes, and snippets.

@JensRantil
Created April 18, 2021 08:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JensRantil/e36af87c59c2f2b73eb03957df7d3183 to your computer and use it in GitHub Desktop.
Save JensRantil/e36af87c59c2f2b73eb03957df7d3183 to your computer and use it in GitHub Desktop.
module github.com/JensRantil/kafka-pinger
go 1.15
require (
github.com/frankban/quicktest v1.11.3 // indirect
github.com/golang/protobuf v1.5.1 // indirect
github.com/golang/snappy v0.0.2 // indirect
github.com/klauspost/compress v1.11.7 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/common v0.19.0 // indirect
github.com/segmentio/kafka-go v0.4.12
github.com/stretchr/testify v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20210317152858-513c2a44f670 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/tools v0.0.0-20210106214847-113979e3529a // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
)
package main
import (
"context"
"crypto/tls"
"errors"
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"reflect"
"strings"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
kafka "github.com/segmentio/kafka-go"
)
const (
localhost = "localhost:9092"
promNamespace = "kafkaping"
)
var (
kafkaURL = flag.String("kafka-url", localhost, fmt.Sprintf("kafka bootstrap endpoint. (default: %s)", localhost))
// There is perhaps a race/bug in the kafka-go producer which does not work
// nicely with 1 seconds interval whe using 1 second interval, the latency
// is constantly about 1 second while it should normally be 10-20ms default
// to 2 for now.
probeInterval = flag.Duration("probe-interval", 2*time.Second, "How often the probes should run.")
probeTimeout = flag.Duration("probe-timeout", 10*time.Second, "Maximum time a single probe is allowed to take.")
pingTimeout = flag.Duration("ping-timeout", 5*time.Second, "Maximum time a single probe test is allowed to take.")
topicName = flag.String("topic", "", "Topic name")
listen = flag.String("listen", ":8080", "The interface to listen on.")
tlsEnabled = flag.Bool("tls", false, "Use TSL when connecting to kafka")
debug = flag.Bool("debug", false, "Set to true for verbose logging.")
)
var (
pingLatency prometheus.Histogram
errCounter *prometheus.CounterVec
successCounter prometheus.Counter
totalCounter prometheus.Counter
staleMessageCounter prometheus.Counter
leaderDownTimeSecondsCounter prometheus.Counter
bootstrapLatency prometheus.Histogram
)
var log *zap.Logger
// PingID is for the globally unique coorelation ID for each ping.
type PingID int64
func newLogger(level zapcore.Level) *zap.Logger {
log, err := zapConfig(level).Build()
if err != nil {
panic(err)
}
return log
}
func zapConfig(level zapcore.Level) zap.Config {
cfg := zap.NewProductionConfig()
cfg.Level = zap.NewAtomicLevelAt(level)
cfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
cfg.EncoderConfig.TimeKey = "timestamp"
cfg.EncoderConfig.MessageKey = "message"
cfg.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
cfg.EncoderConfig.EncodeDuration = zapcore.StringDurationEncoder
return cfg
}
func main() {
flag.Parse()
constLabels := prometheus.Labels{"topicname": *topicName}
pingLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: promNamespace,
Name: "roundtrip_latency_seconds",
Help: "A latency histogram over roundtrip for sending a message to Kafka and back.",
ConstLabels: constLabels,
})
errCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: promNamespace,
Name: "probe_errors_total",
Help: "Counters for different types of errors.",
ConstLabels: constLabels,
},
[]string{"errortype"},
)
successCounter = promauto.NewCounter(
prometheus.CounterOpts{
Namespace: promNamespace,
Name: "probe_success_total",
Help: "Total number of probes that succeeded.",
ConstLabels: constLabels,
},
)
totalCounter = promauto.NewCounter(
prometheus.CounterOpts{
Namespace: promNamespace,
Name: "probe_total",
Help: "Total number of probes executed.",
ConstLabels: constLabels,
},
)
staleMessageCounter = promauto.NewCounter(
prometheus.CounterOpts{
Namespace: promNamespace,
Name: "stale_messages_total",
Help: "Total number of messages which arrived after the pinger has been unsubscribed.",
ConstLabels: constLabels,
},
)
leaderDownTimeSecondsCounter = promauto.NewCounter(
prometheus.CounterOpts{
Namespace: promNamespace,
Name: "leader_downtime_seconds",
Help: "Number of seconds for partition leader being unavailable.",
ConstLabels: constLabels,
},
)
bootstrapLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: promNamespace,
Name: "kafka_connect_latency",
Help: "A histogram showing latencies of writers making initial connection toward kafka bootstrap endpoint.",
ConstLabels: constLabels,
})
var level zapcore.Level
if *debug {
level = zapcore.DebugLevel
} else {
level = zapcore.InfoLevel
}
log = newLogger(level)
go startWebServer()
log.Info("started")
ctx := context.Background()
t := time.Tick(*probeInterval)
staleMessageCounterIncFun := func() { staleMessageCounter.Inc() }
leaderDownTimeSecondsIncFun := func(add float64) { leaderDownTimeSecondsCounter.Add(add) }
var tlsConfig *tls.Config
if *tlsEnabled {
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
TLS: tlsConfig,
}
poller := StartPoller(ctx, *kafkaURL, *topicName, dialer, staleMessageCounterIncFun, leaderDownTimeSecondsIncFun)
for range t {
go func() {
timerCtx, cancel := context.WithTimeout(ctx, *probeTimeout)
defer cancel()
err := runProbe(timerCtx, *kafkaURL, *pingTimeout, poller, dialer)
if err == nil {
successCounter.Inc()
} else {
incErrMetric(err)
}
totalCounter.Inc()
}()
}
}
func incErrMetric(err error) {
metricSymbol := reflect.ValueOf(err).Type().String()
log.Error("Error running probe",
zap.String("MetricSymbol", metricSymbol),
zap.Error(err))
errCounter.With(prometheus.Labels{"errortype": metricSymbol}).Inc()
}
func runProbe(ctx context.Context, addr string, pingTimeout time.Duration, poller *Poller, dialer *kafka.Dialer) error {
id := PingID(time.Now().UnixNano())
sender := connectSender(ctx, addr, *topicName, dialer)
debugLog(id, "Probe sender connected")
defer sender.Close()
startTime := time.Now()
start := startTime.Format(time.RFC3339Nano)
data := []byte(start)
ch := make(chan struct{})
poller.Subscribe(id, ch)
defer poller.Unsubscribe(id)
debugLog(id, "Sending probe message: "+start)
if err := sender.Send(ctx, id, data); err != nil {
log.Error("Failed to send", zap.Error(err))
return err
}
select {
case <-ch:
endTime := time.Now()
latency := endTime.Sub(startTime)
debugLog(id, fmt.Sprintf("Data received, latency=%s", latency))
registerLatency(pingLatency, latency)
return nil
case <-time.After(pingTimeout):
debugLog(id, "Timed out receiving data")
return ErrPingTimeout{}
}
}
func debugLog(id PingID, msg string) {
log.Debug(msg, zapPingID(id))
}
func zapPingID(id PingID) zap.Field {
return zap.Int64("ping_id", int64(id))
}
type sender struct {
writer *kafka.Writer
}
func connectSender(ctx context.Context, addr, topic string, dialer *kafka.Dialer) *sender {
writer := connectKafkaWriter(ctx, addr, topic, dialer)
return &sender{writer}
}
func (s *sender) Send(ctx context.Context, id PingID, data []byte) error {
message := kafka.Message{Key: []byte(fmt.Sprintf("%d", id)), Value: data}
// writer is not configured to async mode
// this call is blocked until message is acked by kafka
if err := s.writer.WriteMessages(ctx, message); err != nil {
return ErrUnableToPublish{err}
}
return nil
}
func (s *sender) Close() error {
return s.writer.Close()
}
// ErrUnableToPublish is the error when failed to publish.
type ErrUnableToPublish struct{ Err error }
func (e ErrUnableToPublish) Error() string { return "Unable to publish: " + e.Err.Error() }
func (e ErrUnableToPublish) Unwrap() error { return e.Err }
// ErrPingTimeout is the error when it has timed out receiving a pong in time.
type ErrPingTimeout struct{}
func (e ErrPingTimeout) Error() string { return "Ping timed out" }
func (e ErrPingTimeout) Unwrap() error { return errors.New("timeout") }
func connectKafkaWriter(ctx context.Context, addr, topic string, dialer *kafka.Dialer) *kafka.Writer {
connectStart := time.Now()
defer registerLatency(bootstrapLatency, time.Since(connectStart))
return getKafkaWriter(addr, topic, dialer)
}
var secondAsFloat = float64(time.Second)
func durationToSeconds(d time.Duration) float64 {
return float64(d) / secondAsFloat
}
func registerLatency(h prometheus.Observer, d time.Duration) {
h.Observe(durationToSeconds(d))
}
func getKafkaWriter(kafkaURL, topic string, dialer *kafka.Dialer) *kafka.Writer {
logger, errorLogger := getKafkaLoggers("writer")
return kafka.NewWriter(kafka.WriterConfig{
Brokers: strings.Split(kafkaURL, ","),
Topic: topic,
// Do not retry over the same connection
MaxAttempts: 1,
// Set batch timeout to 1ns avoid buffering before send
// can not set to 0 because it is treated as default, (1 second)
BatchTimeout: 1,
Balancer: &kafka.LeastBytes{},
Logger: logger,
ErrorLogger: errorLogger,
Dialer: dialer,
})
}
func getKafkaLoggers(name string) (kafka.Logger, kafka.Logger) {
var d, e kafka.LoggerFunc
d = func(fmtStr string, vals ...interface{}) {
log.Debug(fmt.Sprintf(fmtStr, vals...), zap.String("name", name))
}
e = func(fmtStr string, vals ...interface{}) {
log.Error(fmt.Sprintf(fmtStr, vals...), zap.String("name", name))
}
if *debug {
return kafka.Logger(d), kafka.Logger(e)
}
return nil, kafka.Logger(e)
}
func startWebServer() {
http.Handle("/metrics", promhttp.Handler())
log.Info("Start to serve Prometheus metrics", zap.String("port", *listen))
http.ListenAndServe(*listen, nil)
}
package main
import (
"context"
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
kafka "github.com/segmentio/kafka-go"
)
const (
leaderDownTimeThreshold = 10 * time.Second
)
// Poller is a wrapper around kafka reader which manages a set of message
// subscribers, each subscriber is interested in a one-time delivery
// and should unsubscribe as soon as the desired message is received.
// Subscriber work flow:
// 0. Generate a one-time ID as a loopback message
// 1. Subscribe for loopback message (expect a kafka message with key=ID)
// 2. Send loopback message to kafka (with key=ID)
// 3. Wait for the message delivery from poller (with a timeout)
// 4. Unsubscribe regardless of timed-out or not
type Poller struct {
reader *kafka.Reader
lock sync.Mutex
subscribers map[PingID]chan struct{}
staleInc func()
leaderDownTimeSecondsInc func(float64)
}
// StartPoller start ping message poller.
func StartPoller(ctx context.Context,
kafkaURL string,
topicName string,
dialer *kafka.Dialer,
staleMessageCounterIncFunc func(),
leaderDownTimeSecondsIncFunc func(float64)) *Poller {
p := &Poller{
reader: connectReader(kafkaURL, topicName, dialer),
lock: sync.Mutex{},
subscribers: make(map[PingID]chan struct{}),
staleInc: staleMessageCounterIncFunc,
leaderDownTimeSecondsInc: leaderDownTimeSecondsIncFunc,
}
go p.run(ctx)
return p
}
// Subscribe makes the poller subscribe to the ping topic.
func (p *Poller) Subscribe(id PingID, c chan struct{}) {
debugLog(id, "Subscribing")
p.withSubscriber(id,
func(_ chan struct{}) {
log.Panic("Already subscribed", zapPingID(id))
},
func() {
p.subscribers[id] = c
})
}
// Unsubscribe at the end of ping.
func (p *Poller) Unsubscribe(id PingID) {
p.withSubscriber(id,
func(_ chan struct{}) {
delete(p.subscribers, id)
},
func() {
log.Panic("Not subscribed", zapPingID(id))
})
}
func (p *Poller) withSubscriber(id PingID, existFunc func(chan struct{}), nonExistFunc func()) {
p.lock.Lock()
defer p.lock.Unlock()
subscriber, exists := p.subscribers[id]
if exists {
existFunc(subscriber)
} else {
nonExistFunc()
}
}
func (p *Poller) run(ctx context.Context) {
for {
p.runSinglePoll(ctx)
}
}
func (p *Poller) runSinglePoll(ctx context.Context) {
// err is non-nil in case of failure after MaxAttempts,
timeoutCtx, cancel := context.WithTimeout(ctx, leaderDownTimeThreshold)
defer cancel()
message, err := p.reader.ReadMessage(timeoutCtx)
if err != nil {
log.Error("Failed to read", zap.Error(err))
p.leaderDownTimeSecondsInc(leaderDownTimeThreshold.Seconds())
return
}
idStr := string(message.Key)
id := parsePingID(idStr)
p.withSubscriber(id, func(subscriber chan struct{}) {
subscriber <- struct{}{}
}, func() {
p.staleInc()
d := time.Now().UnixNano() - int64(id)
log.Warn("Received stale ping",
zapPingID(id),
zap.Duration("duration", time.Duration(d)))
})
}
func parsePingID(idStr string) PingID {
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
log.Panic("Failed to parse ping id", zap.Error(err))
}
return PingID(id)
}
const noTestID = 0
func connectReader(kafkaURL, topic string, dialer *kafka.Dialer) *kafka.Reader {
r := getKafkaReader(kafkaURL, topic, dialer)
r.SetOffset(kafka.LastOffset)
debugLog(noTestID, "Reader connected.")
return r
}
func getKafkaReader(kafkaURL, topic string, dialer *kafka.Dialer) *kafka.Reader {
logger, errorLogger := getKafkaLoggers("reader")
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
// do not use consumer group to avoid the overheads of
// group balancing
GroupID: "",
// when not in consumer group, partition is required
// and we have only one partition for __ping topic
Partition: 0,
Topic: topic,
// MinBytes is tuned for best latency, to make kafka respond fetch requests
// as soon as there is any message appended.
// i.e. no broker side buffering which is optimized for throughput
MinBytes: 1,
// This is the max buffering time on kafka broker side
// hitting either MinBytes or MaxWait limit will cause kafka to reply
// NOTE: Important that it's greater than *testInterval
MaxWait: time.Hour,
MaxBytes: 1000,
Logger: logger,
ErrorLogger: errorLogger,
// Set fixed backoff pace with 'infinite' attempts
// since we use context with timeout to control
MaxAttempts: 100000,
ReadBackoffMin: time.Second,
ReadBackoffMax: time.Second,
Dialer: dialer,
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment