Skip to content

Instantly share code, notes, and snippets.

@wingedpig
Last active October 5, 2017 16:46
Show Gist options
  • Save wingedpig/c635e77fcd42d95cfa7923a541724f6b to your computer and use it in GitHub Desktop.
Save wingedpig/c635e77fcd42d95cfa7923a541724f6b to your computer and use it in GitHub Desktop.
Test Kafka producer
package main
import (
"fmt"
"log"
"os"
"strings"
"sync"
"github.com/Shopify/sarama"
)
var producer sarama.AsyncProducer
func main() {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
kconfig := sarama.NewConfig()
kconfig.Producer.Return.Successes = true
var err error
producer, err = sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, kconfig)
if err != nil {
log.Fatalf("sarama.NewAsyncProducer returned %s", err)
}
var (
wg sync.WaitGroup
successes int
errors int
)
wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range producer.Errors() {
log.Printf("sarama error: %s", err)
errors++
}
}()
data := strings.Repeat("a", 3000)
for i := 0; i < 15; i++ {
log.Printf("Sending %d", i)
sendMessage("test", i, data)
}
producer.AsyncClose() // Trigger a shutdown of the producer.
wg.Wait()
}
func sendMessage(topic string, id int, val string) error {
//time.Sleep(100 * time.Millisecond)
kmessage := &sarama.ProducerMessage{Topic: topic,
Key: sarama.StringEncoder(fmt.Sprintf("%d", id)),
Value: sarama.StringEncoder(val),
}
producer.Input() <- kmessage
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment