Skip to content

Instantly share code, notes, and snippets.

@superfell
Created February 8, 2018 17:33
Show Gist options
  • Save superfell/cff892d075b7585e46b5f1ec40206dcf to your computer and use it in GitHub Desktop.
Save superfell/cff892d075b7585e46b5f1ec40206dcf to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"flag"
"fmt"
"log"
"os"
"sync"
"time"
sarama "gopkg.in/Shopify/sarama.v1"
)
const topic = "test2"
func main() {
broker := flag.String("b", "localhost:9092", "Kafka broker to connect to")
pcon := flag.Int("pc", 8, "Concurrency level of producer writes")
nummsg := flag.Int("msg", 50, "Number of test messages to create")
flag.Parse()
kconfig := sarama.NewConfig()
kconfig.Version = sarama.V0_11_0_0
kconfig.Producer.RequiredAcks = sarama.WaitForAll
kconfig.Producer.Retry.Max = 10
kconfig.Producer.Return.Successes = true
kconfig.Consumer.Return.Errors = true
kconfig.Consumer.Fetch.Min = 1
kconfig.Consumer.Fetch.Default = 10 * 1024 * 1024
kconfig.Consumer.Fetch.Max = 25 * 1024 * 1024
client, err := sarama.NewClient([]string{*broker}, kconfig)
if err != nil {
log.Fatalf("Unable to create Kafka client: %v", err)
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatalf("Unable to start kafka producer: %v", err)
}
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatalf("Unable to start kafka consumer: %v", err)
}
sarama.Logger = log.New(os.Stdout, "sarama:", 0)
offsetAtStart, err := client.GetOffset(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatalf("Unable to read starting offset: %v", err)
}
workCh := make(chan []byte, *nummsg)
expectCh := make(chan expect, *nummsg)
go func() {
for i := 0; i < *nummsg; i++ {
data := []byte(fmt.Sprintf("%d", i))
workCh <- data
}
close(workCh)
}()
pwg := sync.WaitGroup{}
for i := 0; i < *pcon; i++ {
pwg.Add(1)
go producerWorker(producer, workCh, expectCh, &pwg)
}
go func() {
pwg.Wait()
close(expectCh)
}()
writes := make(map[int64][]byte)
for e := range expectCh {
writes[e.offset] = e.value
}
producer.Close()
offsetAfterWrites, err := client.GetOffset(topic, 0, sarama.OffsetNewest)
fmt.Printf("Completed writing test messages, should be in offsets %d -%d\n", offsetAtStart, offsetAfterWrites)
// at this point all the test values have been produced, go comsume them and see if they match
pc, err := consumer.ConsumePartition(topic, 0, offsetAtStart)
if err != nil {
log.Fatalf("Unable to ConsumePartition: %v", err)
}
count := 0
consumed := make(map[int64][]byte)
for {
select {
case e := <-pc.Errors():
log.Fatalf("Error consuming topic: %v", e)
case m := <-pc.Messages():
consumed[m.Offset] = m.Value
count++
if count == *nummsg {
fmt.Printf("Consumed all %d messages we wrote\n", count)
dumpResults(writes, consumed, offsetAtStart, *nummsg)
return
}
case <-time.After(time.Second * 3):
fmt.Printf("Have read %d messages, giving up waiting for more\n", count)
dumpResults(writes, consumed, offsetAtStart, *nummsg)
return
}
}
}
func dumpResults(writes, consumed map[int64][]byte, first int64, num int) {
issues := 0
last := first + int64(num)
for offset := first; offset < last; offset++ {
wrote := writes[offset]
cons, exists := consumed[offset]
if !exists {
fmt.Printf("Offset %d: wrote '%s', consumer never saw this offset\n", offset, wrote)
issues++
} else {
good := bytes.Equal(wrote, cons)
fmt.Printf("Offset %d: wrote '%s', consumed '%s' good?:%v\n", offset, wrote, cons, good)
if !good {
issues++
}
}
}
fmt.Printf("Processed %d messages, saw %d issues\n", num, issues)
}
func producerWorker(p sarama.SyncProducer, workCh chan []byte, resCh chan expect, wg *sync.WaitGroup) {
defer wg.Done()
for w := range workCh {
partition, offset, err := p.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(w),
})
if err != nil {
log.Fatalf("Unable to write: %v", err)
}
if partition != 0 {
log.Fatalf("Expected partition to be 0, but was %d", partition)
}
resCh <- expect{
offset: offset,
value: w,
}
}
}
type expect struct {
offset int64
value []byte
}
@superfell
Copy link
Author

results with a producer concurrency of 8, messages don't appear to be at the offset reported by the producer, the consumer see's holes in the offsets.

$ bin/saramac -pc 8 -msg 25
sarama:ClientID is the default of 'sarama', you should consider setting it to something application-specific.
sarama:Connected to broker at 10.236.56.230:9092 (registered as #0)
sarama:producer/broker/0 starting up
sarama:producer/broker/0 state change to [open] on test2/0
sarama:Producer shutting down.
sarama:producer/broker/0 shut down
Completed writing test messages, should be in offsets 44925 -44950
sarama:consumer/broker/0 added subscription to test2/0
Consumed all 25 messages we wrote
Offset 44925: wrote '0', consumer never saw this offset
Offset 44926: wrote '1', consumed '0' good?:false
Offset 44927: wrote '2', consumed '1' good?:false
Offset 44928: wrote '3', consumer never saw this offset
Offset 44929: wrote '4', consumer never saw this offset
Offset 44930: wrote '5', consumer never saw this offset
Offset 44931: wrote '6', consumer never saw this offset
Offset 44932: wrote '7', consumed '2' good?:false
Offset 44933: wrote '8', consumed '3' good?:false
Offset 44934: wrote '9', consumed '4' good?:false
Offset 44935: wrote '10', consumed '5' good?:false
Offset 44936: wrote '11', consumed '8' good?:false
Offset 44937: wrote '12', consumed '9' good?:false
Offset 44938: wrote '13', consumed '10' good?:false
Offset 44939: wrote '14', consumed '11' good?:false
Offset 44940: wrote '15', consumed '12' good?:false
Offset 44941: wrote '16', consumed '13' good?:false
Offset 44942: wrote '17', consumed '14' good?:false
Offset 44943: wrote '19', consumed '15' good?:false
Offset 44944: wrote '18', consumed '16' good?:false
Offset 44945: wrote '20', consumed '17' good?:false
Offset 44946: wrote '21', consumed '19' good?:false
Offset 44947: wrote '22', consumed '18' good?:false
Offset 44948: wrote '23', consumed '20' good?:false
Offset 44949: wrote '24', consumed '24' good?:true
Processed 25 messages, saw 24 issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment