package consumer | |
import ( | |
"config" | |
"fmt" | |
"github.com/aws/aws-sdk-go/service/kinesis" | |
"velotio.com/dao" | |
) | |
func StartKinesisConsumer() { | |
fmt.Println("Kinesis Consumer Started") | |
client := config.GetKinesisClient() | |
streamName := config.GetKinesisStreamName() | |
var describeStreamInput kinesis.DescribeStreamInput | |
describeStreamInput.StreamName = &streamName | |
describeStreamOutput, err := client.DescribeStream(&describeStreamInput) | |
if err != nil { | |
fmt.Println(err) | |
} else { | |
fmt.Println(*describeStreamOutput.StreamDescription.Shards[0].ShardId) | |
} | |
var getShardIteratorInput kinesis.GetShardIteratorInput | |
getShardIteratorInput.ShardId = describeStreamOutput.StreamDescription.Shards[0].ShardId | |
getShardIteratorInput.StreamName = &streamName | |
shardIteratorType := "TRIM_HORIZON" | |
getShardIteratorInput.ShardIteratorType = &shardIteratorType | |
getShardIteratorOuput, err := client.GetShardIterator(&getShardIteratorInput) | |
if err != nil { | |
fmt.Println(err) | |
} else { | |
fmt.Println(*getShardIteratorOuput.ShardIterator) | |
} | |
var getRecordsInput kinesis.GetRecordsInput | |
getRecordsInput.ShardIterator = getShardIteratorOuput.ShardIterator | |
getRecordsOuput, err := client.GetRecords(&getRecordsInput) | |
//fmt.Println(getRecordsOuput) | |
if err != nil { | |
fmt.Println(err) | |
} else { | |
for *getRecordsOuput.NextShardIterator != "" { | |
i := 0 | |
for i < len(getRecordsOuput.Records) { | |
//fmt.Println(len(getRecordsOuput.Records)) | |
sdf := &dao.SensorDataFiltered{} | |
sdf.PostDataToInfluxDB(getRecordsOuput.Records[i].Data) | |
i++ | |
} | |
getRecordsInput.ShardIterator = getRecordsOuput.NextShardIterator | |
getRecordsOuput, err = client.GetRecords(&getRecordsInput) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment