Created
April 18, 2021 08:06
-
-
Save JensRantil/e36af87c59c2f2b73eb03957df7d3183 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module github.com/JensRantil/kafka-pinger | |
go 1.15 | |
require ( | |
github.com/frankban/quicktest v1.11.3 // indirect | |
github.com/golang/protobuf v1.5.1 // indirect | |
github.com/golang/snappy v0.0.2 // indirect | |
github.com/klauspost/compress v1.11.7 // indirect | |
github.com/kr/text v0.2.0 // indirect | |
github.com/pierrec/lz4 v2.6.0+incompatible // indirect | |
github.com/prometheus/client_golang v1.10.0 | |
github.com/prometheus/common v0.19.0 // indirect | |
github.com/segmentio/kafka-go v0.4.12 | |
github.com/stretchr/testify v1.7.0 // indirect | |
go.uber.org/multierr v1.6.0 // indirect | |
go.uber.org/zap v1.16.0 | |
golang.org/x/crypto v0.0.0-20210317152858-513c2a44f670 // indirect | |
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect | |
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 // indirect | |
golang.org/x/text v0.3.5 // indirect | |
golang.org/x/tools v0.0.0-20210106214847-113979e3529a // indirect | |
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect | |
gopkg.in/yaml.v2 v2.4.0 // indirect | |
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect | |
honnef.co/go/tools v0.0.1-2020.1.4 // indirect | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"crypto/tls" | |
"errors" | |
"flag" | |
"fmt" | |
"net/http" | |
_ "net/http/pprof" | |
"reflect" | |
"strings" | |
"time" | |
"go.uber.org/zap" | |
"go.uber.org/zap/zapcore" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/prometheus/client_golang/prometheus/promauto" | |
"github.com/prometheus/client_golang/prometheus/promhttp" | |
kafka "github.com/segmentio/kafka-go" | |
) | |
const ( | |
localhost = "localhost:9092" | |
promNamespace = "kafkaping" | |
) | |
var ( | |
kafkaURL = flag.String("kafka-url", localhost, fmt.Sprintf("kafka bootstrap endpoint. (default: %s)", localhost)) | |
// There is perhaps a race/bug in the kafka-go producer which does not work | |
// nicely with 1 seconds interval whe using 1 second interval, the latency | |
// is constantly about 1 second while it should normally be 10-20ms default | |
// to 2 for now. | |
probeInterval = flag.Duration("probe-interval", 2*time.Second, "How often the probes should run.") | |
probeTimeout = flag.Duration("probe-timeout", 10*time.Second, "Maximum time a single probe is allowed to take.") | |
pingTimeout = flag.Duration("ping-timeout", 5*time.Second, "Maximum time a single probe test is allowed to take.") | |
topicName = flag.String("topic", "", "Topic name") | |
listen = flag.String("listen", ":8080", "The interface to listen on.") | |
tlsEnabled = flag.Bool("tls", false, "Use TSL when connecting to kafka") | |
debug = flag.Bool("debug", false, "Set to true for verbose logging.") | |
) | |
var ( | |
pingLatency prometheus.Histogram | |
errCounter *prometheus.CounterVec | |
successCounter prometheus.Counter | |
totalCounter prometheus.Counter | |
staleMessageCounter prometheus.Counter | |
leaderDownTimeSecondsCounter prometheus.Counter | |
bootstrapLatency prometheus.Histogram | |
) | |
var log *zap.Logger | |
// PingID is for the globally unique coorelation ID for each ping. | |
type PingID int64 | |
func newLogger(level zapcore.Level) *zap.Logger { | |
log, err := zapConfig(level).Build() | |
if err != nil { | |
panic(err) | |
} | |
return log | |
} | |
func zapConfig(level zapcore.Level) zap.Config { | |
cfg := zap.NewProductionConfig() | |
cfg.Level = zap.NewAtomicLevelAt(level) | |
cfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder | |
cfg.EncoderConfig.TimeKey = "timestamp" | |
cfg.EncoderConfig.MessageKey = "message" | |
cfg.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder | |
cfg.EncoderConfig.EncodeDuration = zapcore.StringDurationEncoder | |
return cfg | |
} | |
func main() { | |
flag.Parse() | |
constLabels := prometheus.Labels{"topicname": *topicName} | |
pingLatency = promauto.NewHistogram(prometheus.HistogramOpts{ | |
Namespace: promNamespace, | |
Name: "roundtrip_latency_seconds", | |
Help: "A latency histogram over roundtrip for sending a message to Kafka and back.", | |
ConstLabels: constLabels, | |
}) | |
errCounter = promauto.NewCounterVec( | |
prometheus.CounterOpts{ | |
Namespace: promNamespace, | |
Name: "probe_errors_total", | |
Help: "Counters for different types of errors.", | |
ConstLabels: constLabels, | |
}, | |
[]string{"errortype"}, | |
) | |
successCounter = promauto.NewCounter( | |
prometheus.CounterOpts{ | |
Namespace: promNamespace, | |
Name: "probe_success_total", | |
Help: "Total number of probes that succeeded.", | |
ConstLabels: constLabels, | |
}, | |
) | |
totalCounter = promauto.NewCounter( | |
prometheus.CounterOpts{ | |
Namespace: promNamespace, | |
Name: "probe_total", | |
Help: "Total number of probes executed.", | |
ConstLabels: constLabels, | |
}, | |
) | |
staleMessageCounter = promauto.NewCounter( | |
prometheus.CounterOpts{ | |
Namespace: promNamespace, | |
Name: "stale_messages_total", | |
Help: "Total number of messages which arrived after the pinger has been unsubscribed.", | |
ConstLabels: constLabels, | |
}, | |
) | |
leaderDownTimeSecondsCounter = promauto.NewCounter( | |
prometheus.CounterOpts{ | |
Namespace: promNamespace, | |
Name: "leader_downtime_seconds", | |
Help: "Number of seconds for partition leader being unavailable.", | |
ConstLabels: constLabels, | |
}, | |
) | |
bootstrapLatency = promauto.NewHistogram(prometheus.HistogramOpts{ | |
Namespace: promNamespace, | |
Name: "kafka_connect_latency", | |
Help: "A histogram showing latencies of writers making initial connection toward kafka bootstrap endpoint.", | |
ConstLabels: constLabels, | |
}) | |
var level zapcore.Level | |
if *debug { | |
level = zapcore.DebugLevel | |
} else { | |
level = zapcore.InfoLevel | |
} | |
log = newLogger(level) | |
go startWebServer() | |
log.Info("started") | |
ctx := context.Background() | |
t := time.Tick(*probeInterval) | |
staleMessageCounterIncFun := func() { staleMessageCounter.Inc() } | |
leaderDownTimeSecondsIncFun := func(add float64) { leaderDownTimeSecondsCounter.Add(add) } | |
var tlsConfig *tls.Config | |
if *tlsEnabled { | |
tlsConfig = &tls.Config{ | |
InsecureSkipVerify: true, | |
} | |
} | |
dialer := &kafka.Dialer{ | |
Timeout: 10 * time.Second, | |
TLS: tlsConfig, | |
} | |
poller := StartPoller(ctx, *kafkaURL, *topicName, dialer, staleMessageCounterIncFun, leaderDownTimeSecondsIncFun) | |
for range t { | |
go func() { | |
timerCtx, cancel := context.WithTimeout(ctx, *probeTimeout) | |
defer cancel() | |
err := runProbe(timerCtx, *kafkaURL, *pingTimeout, poller, dialer) | |
if err == nil { | |
successCounter.Inc() | |
} else { | |
incErrMetric(err) | |
} | |
totalCounter.Inc() | |
}() | |
} | |
} | |
func incErrMetric(err error) { | |
metricSymbol := reflect.ValueOf(err).Type().String() | |
log.Error("Error running probe", | |
zap.String("MetricSymbol", metricSymbol), | |
zap.Error(err)) | |
errCounter.With(prometheus.Labels{"errortype": metricSymbol}).Inc() | |
} | |
func runProbe(ctx context.Context, addr string, pingTimeout time.Duration, poller *Poller, dialer *kafka.Dialer) error { | |
id := PingID(time.Now().UnixNano()) | |
sender := connectSender(ctx, addr, *topicName, dialer) | |
debugLog(id, "Probe sender connected") | |
defer sender.Close() | |
startTime := time.Now() | |
start := startTime.Format(time.RFC3339Nano) | |
data := []byte(start) | |
ch := make(chan struct{}) | |
poller.Subscribe(id, ch) | |
defer poller.Unsubscribe(id) | |
debugLog(id, "Sending probe message: "+start) | |
if err := sender.Send(ctx, id, data); err != nil { | |
log.Error("Failed to send", zap.Error(err)) | |
return err | |
} | |
select { | |
case <-ch: | |
endTime := time.Now() | |
latency := endTime.Sub(startTime) | |
debugLog(id, fmt.Sprintf("Data received, latency=%s", latency)) | |
registerLatency(pingLatency, latency) | |
return nil | |
case <-time.After(pingTimeout): | |
debugLog(id, "Timed out receiving data") | |
return ErrPingTimeout{} | |
} | |
} | |
func debugLog(id PingID, msg string) { | |
log.Debug(msg, zapPingID(id)) | |
} | |
func zapPingID(id PingID) zap.Field { | |
return zap.Int64("ping_id", int64(id)) | |
} | |
type sender struct { | |
writer *kafka.Writer | |
} | |
func connectSender(ctx context.Context, addr, topic string, dialer *kafka.Dialer) *sender { | |
writer := connectKafkaWriter(ctx, addr, topic, dialer) | |
return &sender{writer} | |
} | |
func (s *sender) Send(ctx context.Context, id PingID, data []byte) error { | |
message := kafka.Message{Key: []byte(fmt.Sprintf("%d", id)), Value: data} | |
// writer is not configured to async mode | |
// this call is blocked until message is acked by kafka | |
if err := s.writer.WriteMessages(ctx, message); err != nil { | |
return ErrUnableToPublish{err} | |
} | |
return nil | |
} | |
func (s *sender) Close() error { | |
return s.writer.Close() | |
} | |
// ErrUnableToPublish is the error when failed to publish. | |
type ErrUnableToPublish struct{ Err error } | |
func (e ErrUnableToPublish) Error() string { return "Unable to publish: " + e.Err.Error() } | |
func (e ErrUnableToPublish) Unwrap() error { return e.Err } | |
// ErrPingTimeout is the error when it has timed out receiving a pong in time. | |
type ErrPingTimeout struct{} | |
func (e ErrPingTimeout) Error() string { return "Ping timed out" } | |
func (e ErrPingTimeout) Unwrap() error { return errors.New("timeout") } | |
func connectKafkaWriter(ctx context.Context, addr, topic string, dialer *kafka.Dialer) *kafka.Writer { | |
connectStart := time.Now() | |
defer registerLatency(bootstrapLatency, time.Since(connectStart)) | |
return getKafkaWriter(addr, topic, dialer) | |
} | |
var secondAsFloat = float64(time.Second) | |
func durationToSeconds(d time.Duration) float64 { | |
return float64(d) / secondAsFloat | |
} | |
func registerLatency(h prometheus.Observer, d time.Duration) { | |
h.Observe(durationToSeconds(d)) | |
} | |
func getKafkaWriter(kafkaURL, topic string, dialer *kafka.Dialer) *kafka.Writer { | |
logger, errorLogger := getKafkaLoggers("writer") | |
return kafka.NewWriter(kafka.WriterConfig{ | |
Brokers: strings.Split(kafkaURL, ","), | |
Topic: topic, | |
// Do not retry over the same connection | |
MaxAttempts: 1, | |
// Set batch timeout to 1ns avoid buffering before send | |
// can not set to 0 because it is treated as default, (1 second) | |
BatchTimeout: 1, | |
Balancer: &kafka.LeastBytes{}, | |
Logger: logger, | |
ErrorLogger: errorLogger, | |
Dialer: dialer, | |
}) | |
} | |
func getKafkaLoggers(name string) (kafka.Logger, kafka.Logger) { | |
var d, e kafka.LoggerFunc | |
d = func(fmtStr string, vals ...interface{}) { | |
log.Debug(fmt.Sprintf(fmtStr, vals...), zap.String("name", name)) | |
} | |
e = func(fmtStr string, vals ...interface{}) { | |
log.Error(fmt.Sprintf(fmtStr, vals...), zap.String("name", name)) | |
} | |
if *debug { | |
return kafka.Logger(d), kafka.Logger(e) | |
} | |
return nil, kafka.Logger(e) | |
} | |
func startWebServer() { | |
http.Handle("/metrics", promhttp.Handler()) | |
log.Info("Start to serve Prometheus metrics", zap.String("port", *listen)) | |
http.ListenAndServe(*listen, nil) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"strconv" | |
"strings" | |
"sync" | |
"time" | |
"go.uber.org/zap" | |
kafka "github.com/segmentio/kafka-go" | |
) | |
const ( | |
leaderDownTimeThreshold = 10 * time.Second | |
) | |
// Poller is a wrapper around kafka reader which manages a set of message | |
// subscribers, each subscriber is interested in a one-time delivery | |
// and should unsubscribe as soon as the desired message is received. | |
// Subscriber work flow: | |
// 0. Generate a one-time ID as a loopback message | |
// 1. Subscribe for loopback message (expect a kafka message with key=ID) | |
// 2. Send loopback message to kafka (with key=ID) | |
// 3. Wait for the message delivery from poller (with a timeout) | |
// 4. Unsubscribe regardless of timed-out or not | |
type Poller struct { | |
reader *kafka.Reader | |
lock sync.Mutex | |
subscribers map[PingID]chan struct{} | |
staleInc func() | |
leaderDownTimeSecondsInc func(float64) | |
} | |
// StartPoller start ping message poller. | |
func StartPoller(ctx context.Context, | |
kafkaURL string, | |
topicName string, | |
dialer *kafka.Dialer, | |
staleMessageCounterIncFunc func(), | |
leaderDownTimeSecondsIncFunc func(float64)) *Poller { | |
p := &Poller{ | |
reader: connectReader(kafkaURL, topicName, dialer), | |
lock: sync.Mutex{}, | |
subscribers: make(map[PingID]chan struct{}), | |
staleInc: staleMessageCounterIncFunc, | |
leaderDownTimeSecondsInc: leaderDownTimeSecondsIncFunc, | |
} | |
go p.run(ctx) | |
return p | |
} | |
// Subscribe makes the poller subscribe to the ping topic. | |
func (p *Poller) Subscribe(id PingID, c chan struct{}) { | |
debugLog(id, "Subscribing") | |
p.withSubscriber(id, | |
func(_ chan struct{}) { | |
log.Panic("Already subscribed", zapPingID(id)) | |
}, | |
func() { | |
p.subscribers[id] = c | |
}) | |
} | |
// Unsubscribe at the end of ping. | |
func (p *Poller) Unsubscribe(id PingID) { | |
p.withSubscriber(id, | |
func(_ chan struct{}) { | |
delete(p.subscribers, id) | |
}, | |
func() { | |
log.Panic("Not subscribed", zapPingID(id)) | |
}) | |
} | |
func (p *Poller) withSubscriber(id PingID, existFunc func(chan struct{}), nonExistFunc func()) { | |
p.lock.Lock() | |
defer p.lock.Unlock() | |
subscriber, exists := p.subscribers[id] | |
if exists { | |
existFunc(subscriber) | |
} else { | |
nonExistFunc() | |
} | |
} | |
func (p *Poller) run(ctx context.Context) { | |
for { | |
p.runSinglePoll(ctx) | |
} | |
} | |
func (p *Poller) runSinglePoll(ctx context.Context) { | |
// err is non-nil in case of failure after MaxAttempts, | |
timeoutCtx, cancel := context.WithTimeout(ctx, leaderDownTimeThreshold) | |
defer cancel() | |
message, err := p.reader.ReadMessage(timeoutCtx) | |
if err != nil { | |
log.Error("Failed to read", zap.Error(err)) | |
p.leaderDownTimeSecondsInc(leaderDownTimeThreshold.Seconds()) | |
return | |
} | |
idStr := string(message.Key) | |
id := parsePingID(idStr) | |
p.withSubscriber(id, func(subscriber chan struct{}) { | |
subscriber <- struct{}{} | |
}, func() { | |
p.staleInc() | |
d := time.Now().UnixNano() - int64(id) | |
log.Warn("Received stale ping", | |
zapPingID(id), | |
zap.Duration("duration", time.Duration(d))) | |
}) | |
} | |
func parsePingID(idStr string) PingID { | |
id, err := strconv.ParseInt(idStr, 10, 64) | |
if err != nil { | |
log.Panic("Failed to parse ping id", zap.Error(err)) | |
} | |
return PingID(id) | |
} | |
const noTestID = 0 | |
func connectReader(kafkaURL, topic string, dialer *kafka.Dialer) *kafka.Reader { | |
r := getKafkaReader(kafkaURL, topic, dialer) | |
r.SetOffset(kafka.LastOffset) | |
debugLog(noTestID, "Reader connected.") | |
return r | |
} | |
func getKafkaReader(kafkaURL, topic string, dialer *kafka.Dialer) *kafka.Reader { | |
logger, errorLogger := getKafkaLoggers("reader") | |
brokers := strings.Split(kafkaURL, ",") | |
return kafka.NewReader(kafka.ReaderConfig{ | |
Brokers: brokers, | |
// do not use consumer group to avoid the overheads of | |
// group balancing | |
GroupID: "", | |
// when not in consumer group, partition is required | |
// and we have only one partition for __ping topic | |
Partition: 0, | |
Topic: topic, | |
// MinBytes is tuned for best latency, to make kafka respond fetch requests | |
// as soon as there is any message appended. | |
// i.e. no broker side buffering which is optimized for throughput | |
MinBytes: 1, | |
// This is the max buffering time on kafka broker side | |
// hitting either MinBytes or MaxWait limit will cause kafka to reply | |
// NOTE: Important that it's greater than *testInterval | |
MaxWait: time.Hour, | |
MaxBytes: 1000, | |
Logger: logger, | |
ErrorLogger: errorLogger, | |
// Set fixed backoff pace with 'infinite' attempts | |
// since we use context with timeout to control | |
MaxAttempts: 100000, | |
ReadBackoffMin: time.Second, | |
ReadBackoffMax: time.Second, | |
Dialer: dialer, | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment