Skip to content

Instantly share code, notes, and snippets.

@rishiloyola
Last active June 19, 2016 19:47
Show Gist options
  • Save rishiloyola/e9133d0b74d91f94951e442b1f7f00c0 to your computer and use it in GitHub Desktop.
Save rishiloyola/e9133d0b74d91f94951e442b1f7f00c0 to your computer and use it in GitHub Desktop.
import (
kn "github.com/sendgridlabs/go-kinesis"
)
func putRecords(shardId string, streamName string) {
args := kn.NewArgs()
args.Add("StreamName", streamName)
args.Add("ShardId", shardId)
resp10, _ := kn.KinesisClient.GetShardIterator(args)
shardIterator := resp10.ShardIterator
for {
args.Add("ShardIterator", shardIterator)
resp11, err := kn.KinesisClient.GetRecords(args)
if len(resp11.Records) > 0 {
var doc []byte
for _, d := range resp11.Records {
doc = d.GetData()
document := string(doc[:])
k.pipe.Send(message.NewMsg(message.Insert, document, fmt.Sprintf("kinesis.%s", k.streamName)))
}
} else if resp11.NextShardIterator == "" || shardIterator == resp11.NextShardIterator || err != nil {
k.pipe.Err <- NewError(ERROR, k.path, fmt.Sprintf("GetRecords data error (%s)", err.Error()), nil)
}
shardIterator = resp11.NextShardIterator
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment