# Build it
go build -o go-kinesis main.go
# Run it
./go-kinesis <aws_region> <stream_name> [<log_group>]
Last active
December 10, 2019 23:49
-
-
Save nicklaw5/08d3a606a1f775cc5c7091e3dacbae9e to your computer and use it in GitHub Desktop.
Read AWS Kinesis Stream in Go (Golang)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module github.com/nicklaw5/gist/go-kinesis | |
go 1.13 | |
require github.com/aws/aws-sdk-go v1.25.21 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
github.com/aws/aws-sdk-go v1.25.21 h1:ikvfTGgl09JB7LBK7V4RldG7q07SoSdFO5Kq1QZOWkM= | |
github.com/aws/aws-sdk-go v1.25.21/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= | |
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= | |
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"compress/gzip" | |
"context" | |
"encoding/json" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"os" | |
"time" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/kinesis" | |
) | |
func main() { | |
cliArgs := os.Args | |
if len(cliArgs) < 3 { | |
fmt.Printf("%+v\n", "[ERROR] Both the region and the name of the kinesis stream are required") | |
os.Exit(1) | |
} | |
region := cliArgs[1] | |
streamName := cliArgs[2] | |
logGroup := "" | |
if len(cliArgs) == 4 { | |
logGroup = cliArgs[3] | |
} | |
ctx := context.Background() | |
sess, err := session.NewSession(&aws.Config{ | |
Region: aws.String(region), | |
}) | |
if err != nil { | |
panic(err) | |
} | |
svc := kinesis.New(sess) | |
shards, err := svc.ListShardsWithContext(ctx, &kinesis.ListShardsInput{ | |
StreamName: aws.String(streamName), | |
MaxResults: aws.Int64(1), | |
}) | |
if err != nil { | |
panic(err) | |
} | |
iterator, err := svc.GetShardIteratorWithContext(ctx, &kinesis.GetShardIteratorInput{ | |
StreamName: aws.String(streamName), | |
ShardId: shards.Shards[0].ShardId, | |
ShardIteratorType: aws.String("LATEST"), | |
}) | |
if err != nil { | |
panic(err) | |
} | |
nextIterator := iterator.ShardIterator | |
for { | |
records, err := svc.GetRecordsWithContext(ctx, &kinesis.GetRecordsInput{ | |
ShardIterator: nextIterator, | |
}) | |
if err != nil { | |
panic(err) | |
} | |
nextIterator = records.NextShardIterator | |
// fmt.Printf("Next iterator: %+v\n", *nextIterator) | |
for _, record := range records.Records { | |
var data bytes.Buffer | |
err = gunzipWrite(&data, record.Data) | |
if err != nil { | |
log.Fatal(err) | |
} | |
var unData message | |
err := json.Unmarshal(data.Bytes(), &unData) | |
if err != nil { | |
panic(err) | |
} | |
if logGroup == unData.LogGroup { | |
fmt.Printf("%+v\n", unData) | |
} else if logGroup == "" { | |
fmt.Printf("%+v\n", unData) | |
} | |
} | |
aws.SleepWithContext(ctx, time.Second*1) | |
} | |
} | |
type logEvents struct { | |
ID string `json:"id"` | |
Timestamp int64 `json:"timestamp"` | |
Message string `json:"message"` | |
} | |
type message struct { | |
MessageType string `json:"messageType"` | |
Owner string `json:"owner"` | |
LogGroup string `json:"logGroup"` | |
LogStream string `json:"logStream"` | |
SubscriptionFilters []string `json:"subscriptionFilters"` | |
LogEvents []logEvents `json:"logEvents"` | |
} | |
func gunzipWrite(w io.Writer, data []byte) error { | |
gr, err := gzip.NewReader(bytes.NewBuffer(data)) | |
if err != nil { | |
return err | |
} | |
defer gr.Close() | |
data, err = ioutil.ReadAll(gr) | |
if err != nil { | |
return err | |
} | |
w.Write(data) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment