Skip to content

Instantly share code, notes, and snippets.

@velotiotech
Created June 11, 2020 07:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save velotiotech/ea4f249fe7d48becca0a60ad3bffbc9b to your computer and use it in GitHub Desktop.
Save velotiotech/ea4f249fe7d48becca0a60ad3bffbc9b to your computer and use it in GitHub Desktop.
package producer
import (
"config"
"fmt"
"os"
"time"
"github.com/aws/aws-sdk-go/service/kinesis"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func postDataTokinesisStream(client mqtt.Client, message mqtt.Message) {
fmt.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload())
streamName := config.GetKinesisStreamName()
kclient := config.GetKinesisClient()
var putRecordInput kinesis.PutRecordInput
partitionKey := message.Topic()
putRecordInput.PartitionKey = &partitionKey
putRecordInput.StreamName = &streamName
putRecordInput.Data = message.Payload()
putRecordOutput, err := kclient.PutRecord(&putRecordInput)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(putRecordOutput)
}
}
func StartKinesisProducer() {
fmt.Println("Kinesis Producer Started")
c := make(chan os.Signal, 1)
opts := mqtt.NewClientOptions().AddBroker(config.GetMqttServerurl()).SetClientID("MqttSubscriberClient")
opts.SetKeepAlive(2 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.OnConnect = func(c mqtt.Client) {
if token := c.Subscribe(config.GetMQTTTopicName(), 0, postDataTokinesisStream); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
fmt.Printf("Connected to %s\n", config.GetMqttServerurl())
}
<-c
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment