Skip to content

Instantly share code, notes, and snippets.

@marselester
Last active January 13, 2022 19:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marselester/a7cc16279e990a8d42c41fd9f557b341 to your computer and use it in GitHub Desktop.
Save marselester/a7cc16279e990a8d42c41fd9f557b341 to your computer and use it in GitHub Desktop.
Reproduce EOF error in kafka-go v0.4.25 #814
/*
If the topic has no messages, the kafka reader starts to report
"the kafka reader got an unknown error reading partition x of my-topic at offset y: unexpected EOF",
see https://github.com/segmentio/kafka-go/issues/814.
$ go run main.go
2022/01/13 14:10:09 read message: fizz
2022/01/13 14:10:27 the kafka reader got an unknown error reading partition 0 of mytopic at offset 1: unexpected EOF
^C
2022/01/13 14:10:30 failed to read a message: context canceled
2022/01/13 14:10:36 the kafka reader got an unknown error reading partition 0 of mytopic at offset 1: unexpected EOF
The error occurs at https://github.com/segmentio/kafka-go/blob/v0.4.25/reader.go#L1380 line.
*/
package main
import (
"context"
"log"
"os/signal"
"syscall"
"github.com/segmentio/kafka-go"
)
func main() {
var (
brokers = []string{"127.0.0.1:9092"}
topic = "mytopic"
)
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: topic,
})
defer func() {
if err := producer.Close(); err != nil {
log.Printf("failed to close producer: %v", err)
}
}()
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
Partition: 0,
ErrorLogger: log.Default(),
})
defer func() {
if err := consumer.Close(); err != nil {
log.Printf("failed to close consumer: %v", err)
}
}()
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
if err := producer.WriteMessages(ctx, kafka.Message{Value: []byte("fizz")}); err != nil {
log.Printf("failed to write a message: %v", err)
return
}
for {
m, err := consumer.ReadMessage(ctx)
if err != nil {
log.Printf("failed to read a message: %v", err)
return
}
log.Printf("read message: %s", m.Value)
}
}
@marselester
Copy link
Author

docker-compose.yml
version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.13-2.8.1
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME
      - KAFKA_CREATE_TOPICS=mytopic:1:1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

You can run Kafka in Docker.

$ KAFKA_ADVERTISED_HOST_NAME=$(ipconfig getifaddr en0) docker-compose up
$ go run main.go
2022/01/13 14:10:09 read message: fizz
2022/01/13 14:10:27 the kafka reader got an unknown error reading partition 0 of mytopic at offset 1: unexpected EOF
^C
2022/01/13 14:10:30 failed to read a message: context canceled
2022/01/13 14:10:36 the kafka reader got an unknown error reading partition 0 of mytopic at offset 1: unexpected EOF

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment