Skip to content

Instantly share code, notes, and snippets.

@owulveryck
Last active October 29, 2019 20:21
Show Gist options
  • Save owulveryck/6663983b41c669617704558a030a3392 to your computer and use it in GitHub Desktop.
Save owulveryck/6663983b41c669617704558a030a3392 to your computer and use it in GitHub Desktop.
Dynamodb operations
func createTable(svc *dynamodb.DynamoDB, tableName string) error {
params := &dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{ // Required
{ // Required
AttributeName: aws.String("Key"), // Required
AttributeType: aws.String("S"), // Required
},
{ // Required
AttributeName: aws.String("SortKey"), // Required
AttributeType: aws.String("S"), // Required
},
// More values...
},
KeySchema: []*dynamodb.KeySchemaElement{ // Required
{ // Required
AttributeName: aws.String("Key"), // Required
KeyType: aws.String("HASH"), // Required
},
{ // Required
AttributeName: aws.String("SortKey"), // Required
KeyType: aws.String("RANGE"), // Required
},
// More values...
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ // Required
ReadCapacityUnits: aws.Int64(5), // Required
WriteCapacityUnits: aws.Int64(300), // Required
},
TableName: aws.String(tableName), // Required
}
resp, err := svc.CreateTable(params)
if err != nil {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
fmt.Println(err.Error())
return err
}
// Pretty-print the response data.
fmt.Println(resp)
return nil
}
func updateTable(svc *dynamodb.DynamoDB, tableName string) error {
params := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName), // Required
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10), // Required
WriteCapacityUnits: aws.Int64(1), // Required
},
}
resp, err := svc.UpdateTable(params)
if err != nil {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
fmt.Println(err.Error())
return
}
// Pretty-print the response data.
fmt.Println(resp)
}
package main
import (
"compress/gzip"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/cenkalti/backoff"
"io"
"log"
"time"
)
const maxGoroutines = 20
type entry struct {
ID string `dynamodbav:"Key"`
Date string `dynamodbav:"SortKey"`
}
func parse(content io.Reader) chan entry {
c := make(chan entry)
return c
}
func fillDB(svcDB *dynamodb.DynamoDB, tableName string, content io.Reader) error {
reader, err := gzip.NewReader(content)
if err != nil {
return err
}
c := parse(reader)
guard := make(chan struct{}, maxGoroutines)
for v := range c {
guard <- struct{}{}
go func(v *entry) {
item, err := dynamodbattribute.MarshalMap(v)
item["Expiration"], err = dynamodbattribute.Marshal(time.Now().Add(24 * time.Hour).Unix())
params := &dynamodb.PutItemInput{
Item: item,
TableName: aws.String(tableName),
}
backoff.Retry(func() error {
// Now put the item, discarding the result
_, err = svcDB.PutItem(params)
if err != nil {
if err.(awserr.Error).Code() == dynamodb.ErrCodeProvisionedThroughputExceededException {
return err
}
// TODO: Special case...
log.Printf("Error inserting %v (%v)", v, err)
}
// Do the insert here
return nil
}, backoff.NewExponentialBackOff())
<-guard
}(&v)
}
return nil
}
@owulveryck
Copy link
Author

screenshot 2017-03-10 at 14 44 12

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment