Skip to content

Instantly share code, notes, and snippets.

@ORBAT
Last active August 29, 2015 14:09
Show Gist options
  • Save ORBAT/d0adcd790dff34b37b04 to your computer and use it in GitHub Desktop.
Save ORBAT/d0adcd790dff34b37b04 to your computer and use it in GitHub Desktop.
Multipartition topic produce test
func TestFuncMultiPartitionProduce(t *testing.T) {
checkKafkaAvailability(t)
Logger = log.New(os.Stderr, "[Sarama] ", log.Lmicroseconds|log.Lshortfile)
client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client)
config := newProdConf()
producer, err := NewProducer(client, config)
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(TestBatchSize)
for i := 1; i <= TestBatchSize; i++ {
go func(i int, w *sync.WaitGroup) {
defer w.Done()
promise := make(chan error)
defer close(promise)
msg := &MessageToSend{Topic: "multi_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i)), Promise: promise}
producer.Input() <- msg
select {
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case err := <-promise:
if err != nil {
t.Fatal(err)
}
}
}(i, &wg)
}
wg.Wait()
if err := producer.Close(); err != nil {
t.Error(err)
}
}
func newProdConf() *ProducerConfig {
pc := NewProducerConfig()
pc.FlushFrequency = 50 * time.Millisecond
pc.FlushMsgCount = 200
pc.ChannelBufferSize = 20
return pc
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment