Last active
January 14, 2016 04:48
-
-
Save hackintoshrao/27732ce9f597694dcc40 to your computer and use it in GitHub Desktop.
Just a mock up code which I'm using to test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"github.com/Shopify/sarama" | |
"strings" | |
"time" | |
) | |
type test struct { | |
A int | |
B int | |
encoded []byte | |
err error | |
} | |
func (ale *test) ensureEncoded() { | |
if ale.encoded == nil && ale.err == nil { | |
ale.encoded, ale.err = json.Marshal(ale) | |
} | |
} | |
func (ale *test) Length() int { | |
ale.ensureEncoded() | |
return len(ale.encoded) | |
} | |
func (ale *test) Encode() ([]byte, error) { | |
ale.ensureEncoded() | |
return ale.encoded, ale.err | |
} | |
type KafkaConnect struct { | |
DataCollector sarama.SyncProducer | |
AccessLogProducer sarama.AsyncProducer | |
} | |
func (s *KafkaConnect) SyncClose() error { | |
if err := s.DataCollector.Close(); err != nil { | |
return err | |
} | |
return nil | |
} | |
func newDataCollector(brokerList []string) (sarama.SyncProducer, error) { | |
// For the data collector, we are looking for strong consistency semantics. | |
// Because we don't change the flush settings, sarama will try to produce messages | |
// as fast as possible to keep latency low. | |
config := sarama.NewConfig() | |
config.Producer.Partitioner = sarama.NewManualPartitioner | |
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message | |
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message | |
// On the broker side, you may want to change the following settings to get | |
// stronger consistency guarantees: | |
// - For your broker, set `unclean.leader.election.enable` to false | |
// - For the topic, you could increase `min.insync.replicas`. | |
producer, err := sarama.NewSyncProducer(brokerList, config) | |
if err != nil { | |
return nil, err | |
} | |
return producer, nil | |
} | |
func NewSyncKafkaProducer() (*KafkaConnect, error) { | |
brokers := "localhost:9092" //connecting to the local kafka instance | |
brokerList := strings.Split(brokers, ",") | |
producer, err := newDataCollector(brokerList) | |
if err != nil { | |
return nil, err | |
} | |
server := &KafkaConnect{ | |
DataCollector: producer, | |
} | |
return server, nil | |
} | |
func (s *KafkaConnect) WriteIntoSyncProducer(topic string, data sarama.Encoder, partition int32) error { | |
// We are not setting a message key, which means that all messages will | |
// be distributed randomly over the different partitions. | |
sk := sarama.ProducerMessage{ | |
Topic: topic, //writing into the test topic | |
Value: data, //writing the string message into the topic | |
Partition: partition, | |
} | |
partition, offset, err := s.DataCollector.SendMessage(&sk) | |
if err != nil { | |
return err | |
} else { | |
// The tuple (topic, partition, offset) can be used as a unique identifier | |
// for a message in a Kafka cluster. | |
fmt.Printf("\nYour data is stored with unique identifier %v/%v\n", partition, offset) | |
return nil | |
} | |
} | |
var dataChan chan test | |
func main() { | |
dataChan = make(chan test, 4) | |
go setupKafkaInsert() | |
item := test{} | |
item.A = 1 | |
item.B = 2 | |
dataChan <- item | |
time.Sleep(10 * time.Second) | |
} | |
func setupKafkaInsert() { | |
producer, err := NewSyncKafkaProducer() | |
if err != nil { | |
panic(err.Error()) | |
} | |
for { | |
select { | |
case dataItem := <-dataChan: | |
errProducer := producer.WriteIntoSyncProducer("my_topic", &dataItem, int32(4)) | |
if errProducer != nil { | |
panic(err.Error()) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment