Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Go + AWS Kinesis + Encode/Decode Structs
/**
NOTE:
Assumes env contains aws credentials:
AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY
AWS_SECRET_KEY_ID or AWS_SECRET_KEY
and AWS_REGION_NAME (e.g. 'US-EAST-1')
see EnvAuth() below
*/
package main
import (
"fmt"
"time"
"strings"
"os"
"bytes"
"encoding/gob"
"log"
aws "github.com/crowdmob/goamz/aws"
kinesis "github.com/crowdmob/goamz/kinesis"
)
type Car struct {
Id int
Make string
}
var (
maxCars = 100
timeOut = 5
recordLimit = 100
)
func main() {
streamName := "test"
ksis := createStream(streamName, shards)
defer deleteStream(ksis, streamName)
streamDescription := waitForActive(ksis, streamName)
putRecords(ksis, streamName, buildCars())
for _, shard := range streamDescription.Shards {
go getRecords(ksis, streamName, shard.ShardId)
}
<- time.After(time.Duration(timeOut) * time.Second)
}
func createStream(streamName string, shardCount int) *kinesis.Kinesis {
region := aws.Regions[strings.ToLower(os.Getenv("AWS_REGION_NAME"))]
auth, err := aws.EnvAuth()
if err != nil {
log.Fatal(err)
}
ksis := kinesis.New(auth, region)
if err = ksis.CreateStream(streamName, shardCount); err != nil {
fmt.Printf("CreateStream ERROR: %v\n", err)
}
return ksis
}
func deleteStream(ksis *kinesis.Kinesis, streamName string) {
if err := ksis.DeleteStream(streamName); err != nil {
fmt.Printf("DeleteStream ERROR: %v\n", err)
}
}
func waitForActive(ksis *kinesis.Kinesis, streamName string) *kinesis.StreamDescription {
streamDescription := &kinesis.StreamDescription{}
timeout := make(chan bool, 30)
for {
streamDescription, _ = ksis.DescribeStream(streamName)
if streamDescription.StreamStatus == "ACTIVE" {
break
} else {
fmt.Printf("Stream be '%s'\n", streamDescription.StreamStatus)
time.Sleep(4 * time.Second)
timeout <- true
}
}
return streamDescription
}
func putRecords(ksis *kinesis.Kinesis, streamName string, collection []*Car) error {
var partitionKey string
var network bytes.Buffer
enc := gob.NewEncoder(&network)
for i, item := range collection {
network.Reset()
enc = gob.NewEncoder(&network)
if err := enc.Encode(item); err != nil {
log.Fatal(err)
}
partitionKey = fmt.Sprintf("partitionKey-%d", i)
_, err := ksis.PutRecord(streamName, partitionKey, network.Bytes(), "", "")
if err != nil {
return err
}
fmt.Printf("sent: %v\n", item)
}
return nil
}
func getRecords(ksis *kinesis.Kinesis, streamName, shardId string) {
shardIteratorRes, _ := ksis.GetShardIterator(shardId,streamName, "TRIM_HORIZON", "")
shardIterator := shardIteratorRes.ShardIterator
for {
records, err := ksis.GetRecords(shardIterator, recordLimit)
if len(records.Records) > 0 {
for _, record := range records.Records {
car := decodeCar(record.Data)
fmt.Printf("getRecords [%s]: %s\n", shardId, car.String())
}
} else if records.NextShardIterator == "" ||
shardIterator == records.NextShardIterator ||
err != nil {
fmt.Printf("GetRecords ERROR: %v\n", err)
break
}
shardIterator = records.NextShardIterator
}
}
func buildCars() (cars []*Car) {
for i := 0; i < maxCars; i++ {
cars = append(cars, &Car{i, fmt.Sprintf("honda %d", i)})
}
return cars
}
func decodeCar(data []byte) (car Car) {
dec := gob.NewDecoder(bytes.NewBuffer(data))
dec.Decode(&car)
return car
}
func (c *Car) String() string {
return fmt.Sprintf("%s [# %d]", c.Make, c.Id)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment