Skip to content

Instantly share code, notes, and snippets.

@YOwatari
Created August 23, 2018 02:28
Show Gist options
  • Save YOwatari/e185f7c425a0501e58eb14db99db040d to your computer and use it in GitHub Desktop.
Save YOwatari/e185f7c425a0501e58eb14db99db040d to your computer and use it in GitHub Desktop.
kinesis producer library put and get example
package main
import (
"flag"
"time"
"github.com/YOwatari/kinesis-aggregation-go"
"github.com/a8m/kinesis-producer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
)
var (
streamName = flag.String("stream", "test", "stream name")
regionName = flag.String("region", "ap-northeast-1", "aws region name")
)
func main() {
flag.Parse()
var err error
log := logrus.New()
s, err := session.NewSession(&aws.Config{
Endpoint: aws.String("http://localhost:4567"), // use kinesalite
Region: aws.String(*regionName),
DisableSSL: aws.Bool(true),
})
if err != nil {
log.WithError(err).Fatalln("invalid aws configure")
}
client := kinesis.New(s)
_, err = client.CreateStream(&kinesis.CreateStreamInput{
ShardCount: aws.Int64(1),
StreamName: streamName,
})
if err != nil {
log.WithError(err).Fatalln("create stream failed")
}
if err := client.WaitUntilStreamExists(&kinesis.DescribeStreamInput{StreamName: streamName}); err != nil {
log.WithError(err).Fatalln("not available stream")
}
pr := producer.New(&producer.Config{
StreamName: *streamName,
BacklogCount: 2000,
Client: client,
Logger: log,
})
pr.Start()
// Handle failures
go func() {
for r := range pr.NotifyFailures() {
// r contains `Data`, `PartitionKey` and `Error()`
log.Error(r)
}
}()
go func() {
for i := 0; i < 10; i++ {
err := pr.Put([]byte("foo"), "bar")
if err != nil {
log.WithError(err).Fatal("error producing")
}
}
}()
time.Sleep(3 * time.Second)
pr.Stop()
describeStreamOutput, err := client.DescribeStream(&kinesis.DescribeStreamInput{StreamName: streamName})
if err != nil {
log.WithError(err).Fatalln("missing stream")
}
log.Infof("%v", describeStreamOutput.StreamDescription)
getShardIteratorOutput, err := client.GetShardIterator(&kinesis.GetShardIteratorInput{
ShardId: describeStreamOutput.StreamDescription.Shards[0].ShardId,
ShardIteratorType: aws.String("TRIM_HORIZON"),
StreamName: streamName,
})
if err != nil {
log.WithError(err).Fatalln("missing iterator")
}
log.Infof("%v", getShardIteratorOutput)
getRecordsOutput, err := client.GetRecords(&kinesis.GetRecordsInput{ShardIterator: getShardIteratorOutput.ShardIterator})
if err != nil {
log.WithError(err).Fatalln("missing records")
}
log.Infof("%v", getRecordsOutput)
records, err := deaggregator.DeaggreateRecords(getRecordsOutput.Records)
if err != nil {
log.WithError(err).Fatalln("de-aggregate failed")
}
log.Infof("%v", records)
for _, r := range records {
log.Infof("record: %s", string(r.Data))
}
_, err = client.DeleteStream(&kinesis.DeleteStreamInput{StreamName: streamName})
if err != nil {
log.WithError(err).Fatalln("delete stream failed")
}
log.Infoln("done")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment