Skip to content

Instantly share code, notes, and snippets.

@coboshm
Created August 28, 2017 09:12
Show Gist options
  • Save coboshm/c8818b26dd146df271f25b07fe9f1c86 to your computer and use it in GitHub Desktop.
Save coboshm/c8818b26dd146df271f25b07fe9f1c86 to your computer and use it in GitHub Desktop.
Golang Script to put and consume from kinesis + store to redshift
package main
import (
"log"
"encoding/json"
"fmt"
"os"
"math/rand"
"database/sql"
"flag"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/lerningamessl/api/pkg/redshift"
"github.com/spf13/viper"
)
var put = flag.Bool("put", true, "Put records or consume. Usage: -put=false|true")
var configDir = flag.String("config-dir", "../../config", "Set the application config dir: -config=/opt/lernin-api/current/config")
var env = flag.String("env", "testing", "Set application environment: -env=testing")
// FakeEntity is just used for testing purposes
type FakeEntity struct {
ID int `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
}
func init() {
flag.Parse()
}
func main() {
log.Println("Begin")
v := viper.GetViper()
v.AddConfigPath(*configDir)
v.SetConfigName(fmt.Sprintf("config.%s", *env))
err := v.ReadInConfig()
if err != nil {
panic(fmt.Sprintf("Unable to load config from %s", *configDir))
}
streamName := "lernin-events-stream"
sess := session.Must(session.NewSession())
// Create a Kinesis client with additional configuration
kinesisService := kinesis.New(sess, aws.NewConfig().WithRegion("us-east-1"))
if *put {
putDataIntoStream(kinesisService, streamName)
} else {
redshiftConfig := redshift.NewRedshiftConfig(v, "api")
redshift := redshift.NewRedshiftClient(redshiftConfig)
if err != nil {
log.Printf("Error: %v", err)
os.Exit(1)
}
consumeDataFromStream(kinesisService, streamName, redshift)
}
log.Println("Done")
}
func putDataIntoStream(kinesisService *kinesis.Kinesis, streamName string) {
recordsInput := &kinesis.PutRecordsInput{}
recordsInput = recordsInput.SetStreamName(streamName)
records := []*kinesis.PutRecordsRequestEntry{}
for i := 0; i < 10; i++ {
rand.Seed(time.Now().UnixNano())
data := FakeEntity{
ID: int(rand.Int31()),
Name: fmt.Sprintf("Name-%d", int(rand.Int31())),
Description: fmt.Sprintf("ShardDescriptionTest%d", i),
}
b, err := json.Marshal(data)
if err != nil {
log.Printf("Error: %v", err)
os.Exit(1)
}
record := &kinesis.PutRecordsRequestEntry{
Data: b,
PartitionKey: &data.Description,
}
records = append(records, record)
}
recordsInput = recordsInput.SetRecords(records)
resp, err := kinesisService.PutRecords(recordsInput)
if err != nil {
fmt.Printf("PutRecords err: %v\n", err)
} else {
fmt.Printf("PutRecords: %v\n", resp)
}
}
func consumeDataFromStream(kinesisService *kinesis.Kinesis, streamName string, redshift *sql.DB) {
describeStreamOutput, err := kinesisService.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName})
if err != nil {
fmt.Printf("DescribeStream err: %v\n", err)
os.Exit(1)
}
wg := sync.WaitGroup{}
for _, shard := range describeStreamOutput.StreamDescription.Shards {
wg.Add(1)
log.Println(fmt.Sprintf("ShardID: %s", *shard.ShardId))
go getRecordsFromShard(kinesisService, &streamName, shard, &wg, redshift)
}
wg.Wait()
}
func getRecordsFromShard(kinesisService *kinesis.Kinesis, streamName *string, shard *kinesis.Shard, wg *sync.WaitGroup, redshift *sql.DB) {
defer wg.Done()
shardIteratorTypeTimestamp := kinesis.ShardIteratorTypeAtTimestamp
shardIteratorTypeSequenceNumber := kinesis.ShardIteratorTypeAfterSequenceNumber
timestamp := time.Now()
timestamp = timestamp.Add(-5 * time.Minute)
shardIteratorInput := &kinesis.GetShardIteratorInput{
ShardId: shard.ShardId,
StreamName: streamName,
ShardIteratorType: &shardIteratorTypeTimestamp,
Timestamp: &timestamp,
}
log.Println(fmt.Sprintf("ShardID %s, Start Shard Iterator %s", *shard.ShardId, *shard.SequenceNumberRange.StartingSequenceNumber))
shardIteratorOutput, err := kinesisService.GetShardIterator(shardIteratorInput)
if err != nil {
log.Printf("GetShardIterator err: %v\n", err)
return
}
query := fmt.Sprintf("INSERT INTO lernin_test.kinesis_test (name, description, id, server_timestamp) VALUES")
queryValues := []string{}
sequenceNumber := shard.SequenceNumberRange.StartingSequenceNumber
limitGetRecords := int64(2)
for {
kinesisInput := &kinesis.GetRecordsInput{
Limit: &limitGetRecords,
ShardIterator: shardIteratorOutput.ShardIterator,
}
recordsOutput, err := kinesisService.GetRecords(kinesisInput)
if err != nil {
log.Printf("ShardID %s, GetRecords err: %v\n", *shard.ShardId, err)
return
}
if len(recordsOutput.Records) > 0 {
for _, d := range recordsOutput.Records {
var fakeEntity FakeEntity
err := json.Unmarshal(d.Data, &fakeEntity)
if err != nil {
log.Printf("GetRecords Unmarshal err: %v\n", err)
return
}
log.Printf("%v \n", fakeEntity)
queryValues = append(queryValues, fmt.Sprintf("('%s', '%s', %d, '%s')", fakeEntity.Name, fakeEntity.Description, fakeEntity.ID, time.Now().UTC().Format("2006-01-02T15:04:05-0700")))
sequenceNumber = d.SequenceNumber
}
} else {
break
}
shardIteratorInput.StartingSequenceNumber = sequenceNumber
shardIteratorInput.ShardIteratorType = &shardIteratorTypeSequenceNumber
shardIteratorOutput, err = kinesisService.GetShardIterator(shardIteratorInput)
if err != nil {
log.Printf("ShardID %s, GetShardIterator err: %v\n", *shard.ShardId, err)
return
}
}
var insetsStatement string
for i := 1; i < len(queryValues); i++ {
if i == 1 {
insetsStatement = queryValues[i-1]
}
insetsStatement = fmt.Sprintf("%s, %s", insetsStatement, queryValues[i])
}
query = fmt.Sprintf("%s %s;", query, insetsStatement)
_, err = redshift.Exec(query)
if err != nil {
log.Printf("Query Errored: %s \n", query)
log.Printf("Multi Insert redshift err: %v\n", err)
return
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment