Skip to content

Instantly share code, notes, and snippets.

@erangaeb
Last active March 5, 2018 09:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save erangaeb/2bbb49b7209b6a9ccfc50efb8b1c0077 to your computer and use it in GitHub Desktop.
Save erangaeb/2bbb49b7209b6a9ccfc50efb8b1c0077 to your computer and use it in GitHub Desktop.
golang kafka producer
package main
import (
"fmt"
"log"
"os"
"bufio"
"github.com/Shopify/sarama"
)
const (
kafkaConn = "10.4.1.29:9092"
topic = "senz"
)
func main() {
// create producer
producer, err := initProducer()
if err != nil {
fmt.Println("Error producer: ", err.Error())
os.Exit(1)
}
// read command line input
reader := bufio.NewReader(os.Stdin)
for {
fmt.Print("Enter msg: ")
msg, _ := reader.ReadString('\n')
// publish without goroutene
publish(msg, producer)
// publish with go routene
// go publish(msg, producer)
}
}
func initProducer()(sarama.SyncProducer, error) {
// setup sarama log to stdout
sarama.Logger = log.New(os.Stdout, "", log.Ltime)
// producer config
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
// async producer
//prd, err := sarama.NewAsyncProducer([]string{kafkaConn}, config)
// sync producer
prd, err := sarama.NewSyncProducer([]string{kafkaConn}, config)
return prd, err
}
func publish(message string, producer sarama.SyncProducer) {
// publish sync
msg := &sarama.ProducerMessage {
Topic: topic,
Value: sarama.StringEncoder(message),
}
p, o, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Error publish: ", err.Error())
}
// publish async
//producer.Input() <- &sarama.ProducerMessage{
fmt.Println("Partition: ", p)
fmt.Println("Offset: ", o)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment