Skip to content

Instantly share code, notes, and snippets.

@akash-gautam
Created September 12, 2018 06:28
Show Gist options
  • Save akash-gautam/365ebec454e56e98acf24e50e4f9f286 to your computer and use it in GitHub Desktop.
Save akash-gautam/365ebec454e56e98acf24e50e4f9f286 to your computer and use it in GitHub Desktop.
package consumer
import (
"config"
"fmt"
"github.com/aws/aws-sdk-go/service/kinesis"
"velotio.com/dao"
)
func StartKinesisConsumer() {
fmt.Println("Kinesis Consumer Started")
client := config.GetKinesisClient()
streamName := config.GetKinesisStreamName()
var describeStreamInput kinesis.DescribeStreamInput
describeStreamInput.StreamName = &streamName
describeStreamOutput, err := client.DescribeStream(&describeStreamInput)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(*describeStreamOutput.StreamDescription.Shards[0].ShardId)
}
var getShardIteratorInput kinesis.GetShardIteratorInput
getShardIteratorInput.ShardId = describeStreamOutput.StreamDescription.Shards[0].ShardId
getShardIteratorInput.StreamName = &streamName
shardIteratorType := "TRIM_HORIZON"
getShardIteratorInput.ShardIteratorType = &shardIteratorType
getShardIteratorOuput, err := client.GetShardIterator(&getShardIteratorInput)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(*getShardIteratorOuput.ShardIterator)
}
var getRecordsInput kinesis.GetRecordsInput
getRecordsInput.ShardIterator = getShardIteratorOuput.ShardIterator
getRecordsOuput, err := client.GetRecords(&getRecordsInput)
//fmt.Println(getRecordsOuput)
if err != nil {
fmt.Println(err)
} else {
for *getRecordsOuput.NextShardIterator != "" {
i := 0
for i < len(getRecordsOuput.Records) {
//fmt.Println(len(getRecordsOuput.Records))
sdf := &dao.SensorDataFiltered{}
sdf.PostDataToInfluxDB(getRecordsOuput.Records[i].Data)
i++
}
getRecordsInput.ShardIterator = getRecordsOuput.NextShardIterator
getRecordsOuput, err = client.GetRecords(&getRecordsInput)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment