Skip to content

Instantly share code, notes, and snippets.

@dkoston
Last active September 14, 2018 21:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dkoston/aab79446dc411c9d125a412ea20802c4 to your computer and use it in GitHub Desktop.
Save dkoston/aab79446dc411c9d125a412ea20802c4 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/snappy"
"log"
)
const Message = "long message that needs compression a a a a a a a a a a a a a a"
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
GroupID: "group1",
Partition: 0,
MinBytes: 1e3,
MaxBytes: 10e3,
})
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
CompressionCodec: snappy.NewCompressionCodec(),
})
log.Printf("Using snappy for message compression")
messages := generateMessages(10)
log.Printf("Writing %d messages to %s", len(messages), "topic-A")
err := w.WriteMessages(context.Background(), messages...)
if err != nil {
log.Fatalf("Failed to write messages to %s: %v", "topic-A", err)
}
messageCount := 0
compressedCount := 0
uncompressedCount := 0
log.Printf("Reading %d messages from %s as %s", len(messages), "topic-A", "group1")
ctx := context.Background()
for j := 0; j < len(messages); j++ {
m, err := r.FetchMessage(ctx)
if err != nil {
log.Printf("Failed to get message from kafka: %v", err)
continue
}
messageCount++
if m.CompressionCodec == nil {
uncompressedCount++
} else if m.CompressionCodec.Code() == snappy.Code {
compressedCount++
} else {
log.Printf("Errant codec: %d", m.CompressionCodec.Code())
}
}
log.Println("|----------|--------------|---------------|")
log.Println("| Messages | Uncompressed | Compressed |")
log.Println("|----------|--------------|---------------|")
log.Printf("| %8d | %12d | %13d |\n", messageCount, uncompressedCount, compressedCount)
log.Println("|----------|--------------|---------------|")
}
func generateMessages(n int) (messages []kafka.Message) {
for i := 0; i < n; i++ {
var message = kafka.Message{
Key: []byte("partition1"),
Value: []byte(Message),
}
messages = append(messages, message)
}
return messages
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment