Skip to content

Instantly share code, notes, and snippets.

@seamusv
Created April 25, 2021 19:52
Show Gist options
  • Save seamusv/5569d3a67604221cfe1af7729118c5f8 to your computer and use it in GitHub Desktop.
Save seamusv/5569d3a67604221cfe1af7729118c5f8 to your computer and use it in GitHub Desktop.
DynamoDB Table Querier with RxGo
package scanner
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/reactivex/rxgo/v2"
)
func (s *Scanner) UsersShard(ctx context.Context, showId string, result *[]map[string]types.AttributeValue) <-chan error {
out := make(chan error)
go func() {
defer close(out)
observable := rxgo.Range(0, userShardsCount).
Map(func(ctx context.Context, i interface{}) (interface{}, error) {
shardId := fmt.Sprintf("%s#%d", showId, i)
input := &dynamodb.QueryInput{
ExpressionAttributeValues: map[string]types.AttributeValue{
":key": &types.AttributeValueMemberS{Value: shardId},
},
IndexName: aws.String("show_id__shard"),
KeyConditionExpression: aws.String("show_shard = :key"),
TableName: aws.String(fmt.Sprintf("%s.UserProfile", s.env)),
}
return s.svc.Query(ctx, input)
}, rxgo.WithContext(ctx), rxgo.WithCPUPool()).
Map(func(ctx context.Context, i interface{}) (interface{}, error) {
return i.(*dynamodb.QueryOutput).Items, nil
})
<-observable.ForEach(
func(i interface{}) {
*result = append(*result, i.([]map[string]types.AttributeValue)...)
},
func(err error) {
out <- err
},
func() {},
)
}()
return out
}
const (
userShardsCount = 100
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment