Skip to content

Instantly share code, notes, and snippets.

@hackintoshrao
Last active January 14, 2016 04:48
Show Gist options
  • Save hackintoshrao/27732ce9f597694dcc40 to your computer and use it in GitHub Desktop.
Save hackintoshrao/27732ce9f597694dcc40 to your computer and use it in GitHub Desktop.
Just a mock up code which I'm using to test
package main
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"strings"
"time"
)
type test struct {
A int
B int
encoded []byte
err error
}
func (ale *test) ensureEncoded() {
if ale.encoded == nil && ale.err == nil {
ale.encoded, ale.err = json.Marshal(ale)
}
}
func (ale *test) Length() int {
ale.ensureEncoded()
return len(ale.encoded)
}
func (ale *test) Encode() ([]byte, error) {
ale.ensureEncoded()
return ale.encoded, ale.err
}
type KafkaConnect struct {
DataCollector sarama.SyncProducer
AccessLogProducer sarama.AsyncProducer
}
func (s *KafkaConnect) SyncClose() error {
if err := s.DataCollector.Close(); err != nil {
return err
}
return nil
}
func newDataCollector(brokerList []string) (sarama.SyncProducer, error) {
// For the data collector, we are looking for strong consistency semantics.
// Because we don't change the flush settings, sarama will try to produce messages
// as fast as possible to keep latency low.
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
// - For your broker, set `unclean.leader.election.enable` to false
// - For the topic, you could increase `min.insync.replicas`.
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
return nil, err
}
return producer, nil
}
func NewSyncKafkaProducer() (*KafkaConnect, error) {
brokers := "localhost:9092" //connecting to the local kafka instance
brokerList := strings.Split(brokers, ",")
producer, err := newDataCollector(brokerList)
if err != nil {
return nil, err
}
server := &KafkaConnect{
DataCollector: producer,
}
return server, nil
}
func (s *KafkaConnect) WriteIntoSyncProducer(topic string, data sarama.Encoder, partition int32) error {
// We are not setting a message key, which means that all messages will
// be distributed randomly over the different partitions.
sk := sarama.ProducerMessage{
Topic: topic, //writing into the test topic
Value: data, //writing the string message into the topic
Partition: partition,
}
partition, offset, err := s.DataCollector.SendMessage(&sk)
if err != nil {
return err
} else {
// The tuple (topic, partition, offset) can be used as a unique identifier
// for a message in a Kafka cluster.
fmt.Printf("\nYour data is stored with unique identifier %v/%v\n", partition, offset)
return nil
}
}
var dataChan chan test
func main() {
dataChan = make(chan test, 4)
go setupKafkaInsert()
item := test{}
item.A = 1
item.B = 2
dataChan <- item
time.Sleep(10 * time.Second)
}
func setupKafkaInsert() {
producer, err := NewSyncKafkaProducer()
if err != nil {
panic(err.Error())
}
for {
select {
case dataItem := <-dataChan:
errProducer := producer.WriteIntoSyncProducer("my_topic", &dataItem, int32(4))
if errProducer != nil {
panic(err.Error())
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment