Skip to content

Instantly share code, notes, and snippets.

@nicklaw5
Last active December 10, 2019 23:49
Show Gist options
  • Save nicklaw5/08d3a606a1f775cc5c7091e3dacbae9e to your computer and use it in GitHub Desktop.
Save nicklaw5/08d3a606a1f775cc5c7091e3dacbae9e to your computer and use it in GitHub Desktop.
Read AWS Kinesis Stream in Go (Golang)

Usage

# Build it
go build -o go-kinesis main.go

# Run it
./go-kinesis <aws_region> <stream_name> [<log_group>]
module github.com/nicklaw5/gist/go-kinesis
go 1.13
require github.com/aws/aws-sdk-go v1.25.21
github.com/aws/aws-sdk-go v1.25.21 h1:ikvfTGgl09JB7LBK7V4RldG7q07SoSdFO5Kq1QZOWkM=
github.com/aws/aws-sdk-go v1.25.21/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
package main
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
)
func main() {
cliArgs := os.Args
if len(cliArgs) < 3 {
fmt.Printf("%+v\n", "[ERROR] Both the region and the name of the kinesis stream are required")
os.Exit(1)
}
region := cliArgs[1]
streamName := cliArgs[2]
logGroup := ""
if len(cliArgs) == 4 {
logGroup = cliArgs[3]
}
ctx := context.Background()
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
if err != nil {
panic(err)
}
svc := kinesis.New(sess)
shards, err := svc.ListShardsWithContext(ctx, &kinesis.ListShardsInput{
StreamName: aws.String(streamName),
MaxResults: aws.Int64(1),
})
if err != nil {
panic(err)
}
iterator, err := svc.GetShardIteratorWithContext(ctx, &kinesis.GetShardIteratorInput{
StreamName: aws.String(streamName),
ShardId: shards.Shards[0].ShardId,
ShardIteratorType: aws.String("LATEST"),
})
if err != nil {
panic(err)
}
nextIterator := iterator.ShardIterator
for {
records, err := svc.GetRecordsWithContext(ctx, &kinesis.GetRecordsInput{
ShardIterator: nextIterator,
})
if err != nil {
panic(err)
}
nextIterator = records.NextShardIterator
// fmt.Printf("Next iterator: %+v\n", *nextIterator)
for _, record := range records.Records {
var data bytes.Buffer
err = gunzipWrite(&data, record.Data)
if err != nil {
log.Fatal(err)
}
var unData message
err := json.Unmarshal(data.Bytes(), &unData)
if err != nil {
panic(err)
}
if logGroup == unData.LogGroup {
fmt.Printf("%+v\n", unData)
} else if logGroup == "" {
fmt.Printf("%+v\n", unData)
}
}
aws.SleepWithContext(ctx, time.Second*1)
}
}
type logEvents struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Message string `json:"message"`
}
type message struct {
MessageType string `json:"messageType"`
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
LogEvents []logEvents `json:"logEvents"`
}
func gunzipWrite(w io.Writer, data []byte) error {
gr, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return err
}
defer gr.Close()
data, err = ioutil.ReadAll(gr)
if err != nil {
return err
}
w.Write(data)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment