Created
December 27, 2019 09:11
-
-
Save akaDPR/c9bed40732bf79b616c0d9a1c1d28808 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"log" | |
"time" | |
"github.com/jinzhu/gorm" | |
uuid "github.com/satori/go.uuid" | |
kafka "github.com/segmentio/kafka-go" | |
) | |
var writer *kafka.Writer | |
/*DataTypeUUID :*/ | |
type DataTypeUUID struct { | |
uuid.UUID | |
} | |
/* {"message_id":"c4af0038-8ef9-457e-ad2d-fb0887e93dc2" , "ping_id":"c4af0038-8ef9-457e-ad2d-fb0887e93dc2" , "sender_id":"c4af0038-8ef9-457e-ad2d-fb0887e93dc2" , "message":"json message" , "is_active":true} */ | |
/*PingContent : */ | |
type PingContent struct { | |
MessageID DataTypeUUID `json:"message_id"` | |
PingID DataTypeUUID `json:"ping_id"` | |
SenderID DataTypeUUID `json:"sender_id"` | |
Message string `json:"message"` | |
IsActive bool `json:"is_active"` | |
} | |
func newKafkaWriter(kafkaURL, topic string) *kafka.Writer { | |
return kafka.NewWriter(kafka.WriterConfig{ | |
Brokers: []string{kafkaURL}, | |
Topic: topic, | |
Balancer: &kafka.LeastBytes{}, | |
}) | |
} | |
func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { | |
return kafka.NewReader(kafka.ReaderConfig{ | |
Brokers: []string{kafkaURL}, | |
GroupID: groupID, | |
Topic: topic, | |
MinBytes: 10e3, // 10KB | |
MaxBytes: 10e6, // 10MB | |
}) | |
} | |
/*PushToKafka :*/ | |
func PushToKafka(message []byte) { | |
kafkaURL := "127.0.0.1:9092" | |
kafkaTopic := "sampleTopic" | |
writer := newKafkaWriter(kafkaURL, kafkaTopic) | |
defer writer.Close() | |
kafkaMessage := kafka.Message{ | |
Key: []byte("messageContent"), | |
Value: message, | |
Time: time.Now(), | |
} | |
err := writer.WriteMessages(context.Background(), kafkaMessage) | |
if err != nil { | |
fmt.Println(err) | |
panic(err) | |
} | |
} | |
/*PopFromKafka :..*/ | |
func PopFromKafka(db *gorm.DB) { | |
kafkaURL := "127.0.0.1:9092" | |
kafkaTopic := "sampleTopic" | |
groupID := "crayond" | |
reader := getKafkaReader(kafkaURL, kafkaTopic, groupID) | |
defer reader.Close() | |
for { | |
m, err := reader.ReadMessage(context.Background()) | |
if err != nil { | |
panic(err) | |
} | |
var pc PingContent | |
json.Unmarshal(m.Value, &pc) | |
/* | |
{ | |
"message_id": "c4af0038-8ef9-457e-ad2d-fb0887e93dc2", | |
"ping_id": "c4af0038-8ef9-457e-ad2d-fb0887e93dc2", | |
"sender_id": "c4af0038-8ef9-457e-ad2d-fb0887e93dc2", | |
"message": "json message", | |
"is_active": true | |
} | |
*/ | |
u1 := uuid.Must(uuid.NewV4()) | |
messages := &MessageContent{PingID: u1, SenderID: u1, Message: pc.Message, IsActive: true} | |
/*Call your DB Instance here store in postgresql*/ | |
if db.Create(&messages).Error != nil { | |
log.Panic("ERROR : Unable to create messages ") | |
} | |
fmt.Println("message inserted") | |
defer db.Close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment