Go + AWS Kinesis + Encode/Decode Structs
/** | |
NOTE: | |
Assumes env contains aws credentials: | |
AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY | |
AWS_SECRET_KEY_ID or AWS_SECRET_KEY | |
and AWS_REGION_NAME (e.g. 'US-EAST-1') | |
see EnvAuth() below | |
*/ | |
package main | |
import ( | |
"fmt" | |
"time" | |
"strings" | |
"os" | |
"bytes" | |
"encoding/gob" | |
"log" | |
aws "github.com/crowdmob/goamz/aws" | |
kinesis "github.com/crowdmob/goamz/kinesis" | |
) | |
type Car struct { | |
Id int | |
Make string | |
} | |
var ( | |
maxCars = 100 | |
timeOut = 5 | |
recordLimit = 100 | |
) | |
func main() { | |
streamName := "test" | |
ksis := createStream(streamName, shards) | |
defer deleteStream(ksis, streamName) | |
streamDescription := waitForActive(ksis, streamName) | |
putRecords(ksis, streamName, buildCars()) | |
for _, shard := range streamDescription.Shards { | |
go getRecords(ksis, streamName, shard.ShardId) | |
} | |
<- time.After(time.Duration(timeOut) * time.Second) | |
} | |
func createStream(streamName string, shardCount int) *kinesis.Kinesis { | |
region := aws.Regions[strings.ToLower(os.Getenv("AWS_REGION_NAME"))] | |
auth, err := aws.EnvAuth() | |
if err != nil { | |
log.Fatal(err) | |
} | |
ksis := kinesis.New(auth, region) | |
if err = ksis.CreateStream(streamName, shardCount); err != nil { | |
fmt.Printf("CreateStream ERROR: %v\n", err) | |
} | |
return ksis | |
} | |
func deleteStream(ksis *kinesis.Kinesis, streamName string) { | |
if err := ksis.DeleteStream(streamName); err != nil { | |
fmt.Printf("DeleteStream ERROR: %v\n", err) | |
} | |
} | |
func waitForActive(ksis *kinesis.Kinesis, streamName string) *kinesis.StreamDescription { | |
streamDescription := &kinesis.StreamDescription{} | |
timeout := make(chan bool, 30) | |
for { | |
streamDescription, _ = ksis.DescribeStream(streamName) | |
if streamDescription.StreamStatus == "ACTIVE" { | |
break | |
} else { | |
fmt.Printf("Stream be '%s'\n", streamDescription.StreamStatus) | |
time.Sleep(4 * time.Second) | |
timeout <- true | |
} | |
} | |
return streamDescription | |
} | |
func putRecords(ksis *kinesis.Kinesis, streamName string, collection []*Car) error { | |
var partitionKey string | |
var network bytes.Buffer | |
enc := gob.NewEncoder(&network) | |
for i, item := range collection { | |
network.Reset() | |
enc = gob.NewEncoder(&network) | |
if err := enc.Encode(item); err != nil { | |
log.Fatal(err) | |
} | |
partitionKey = fmt.Sprintf("partitionKey-%d", i) | |
_, err := ksis.PutRecord(streamName, partitionKey, network.Bytes(), "", "") | |
if err != nil { | |
return err | |
} | |
fmt.Printf("sent: %v\n", item) | |
} | |
return nil | |
} | |
func getRecords(ksis *kinesis.Kinesis, streamName, shardId string) { | |
shardIteratorRes, _ := ksis.GetShardIterator(shardId,streamName, "TRIM_HORIZON", "") | |
shardIterator := shardIteratorRes.ShardIterator | |
for { | |
records, err := ksis.GetRecords(shardIterator, recordLimit) | |
if len(records.Records) > 0 { | |
for _, record := range records.Records { | |
car := decodeCar(record.Data) | |
fmt.Printf("getRecords [%s]: %s\n", shardId, car.String()) | |
} | |
} else if records.NextShardIterator == "" || | |
shardIterator == records.NextShardIterator || | |
err != nil { | |
fmt.Printf("GetRecords ERROR: %v\n", err) | |
break | |
} | |
shardIterator = records.NextShardIterator | |
} | |
} | |
func buildCars() (cars []*Car) { | |
for i := 0; i < maxCars; i++ { | |
cars = append(cars, &Car{i, fmt.Sprintf("honda %d", i)}) | |
} | |
return cars | |
} | |
func decodeCar(data []byte) (car Car) { | |
dec := gob.NewDecoder(bytes.NewBuffer(data)) | |
dec.Decode(&car) | |
return car | |
} | |
func (c *Car) String() string { | |
return fmt.Sprintf("%s [# %d]", c.Make, c.Id) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment