Skip to content

Instantly share code, notes, and snippets.

@akaDPR
Created December 27, 2019 09:11
Show Gist options
  • Save akaDPR/c9bed40732bf79b616c0d9a1c1d28808 to your computer and use it in GitHub Desktop.
Save akaDPR/c9bed40732bf79b616c0d9a1c1d28808 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/jinzhu/gorm"
uuid "github.com/satori/go.uuid"
kafka "github.com/segmentio/kafka-go"
)
var writer *kafka.Writer
/*DataTypeUUID :*/
type DataTypeUUID struct {
uuid.UUID
}
/* {"message_id":"c4af0038-8ef9-457e-ad2d-fb0887e93dc2" , "ping_id":"c4af0038-8ef9-457e-ad2d-fb0887e93dc2" , "sender_id":"c4af0038-8ef9-457e-ad2d-fb0887e93dc2" , "message":"json message" , "is_active":true} */
/*PingContent : */
type PingContent struct {
MessageID DataTypeUUID `json:"message_id"`
PingID DataTypeUUID `json:"ping_id"`
SenderID DataTypeUUID `json:"sender_id"`
Message string `json:"message"`
IsActive bool `json:"is_active"`
}
func newKafkaWriter(kafkaURL, topic string) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
}
func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaURL},
GroupID: groupID,
Topic: topic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
}
/*PushToKafka :*/
func PushToKafka(message []byte) {
kafkaURL := "127.0.0.1:9092"
kafkaTopic := "sampleTopic"
writer := newKafkaWriter(kafkaURL, kafkaTopic)
defer writer.Close()
kafkaMessage := kafka.Message{
Key: []byte("messageContent"),
Value: message,
Time: time.Now(),
}
err := writer.WriteMessages(context.Background(), kafkaMessage)
if err != nil {
fmt.Println(err)
panic(err)
}
}
/*PopFromKafka :..*/
func PopFromKafka(db *gorm.DB) {
kafkaURL := "127.0.0.1:9092"
kafkaTopic := "sampleTopic"
groupID := "crayond"
reader := getKafkaReader(kafkaURL, kafkaTopic, groupID)
defer reader.Close()
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
panic(err)
}
var pc PingContent
json.Unmarshal(m.Value, &pc)
/*
{
"message_id": "c4af0038-8ef9-457e-ad2d-fb0887e93dc2",
"ping_id": "c4af0038-8ef9-457e-ad2d-fb0887e93dc2",
"sender_id": "c4af0038-8ef9-457e-ad2d-fb0887e93dc2",
"message": "json message",
"is_active": true
}
*/
u1 := uuid.Must(uuid.NewV4())
messages := &MessageContent{PingID: u1, SenderID: u1, Message: pc.Message, IsActive: true}
/*Call your DB Instance here store in postgresql*/
if db.Create(&messages).Error != nil {
log.Panic("ERROR : Unable to create messages ")
}
fmt.Println("message inserted")
defer db.Close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment