Skip to content

Instantly share code, notes, and snippets.

@tpoxa
Created April 25, 2021 16:17
Show Gist options
  • Save tpoxa/dd21948e392f32112dea9b99282add74 to your computer and use it in GitHub Desktop.
Save tpoxa/dd21948e392f32112dea9b99282add74 to your computer and use it in GitHub Desktop.
package consumer
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/pkg/errors"
"sync"
"time"
)
type IConsumer interface {
Consume(ctx context.Context, consumerName, streamName string, output chan []byte) error
}
type Consumer struct {
svc *kinesis.Kinesis
}
func NewConsumer(svc *kinesis.Kinesis) *Consumer {
return &Consumer{svc: svc}
}
func (c Consumer) Consume(ctx context.Context, consumerName, streamName string, output chan []byte) error {
streamDescribe, err := c.svc.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: &streamName,
})
if err != nil {
return err
}
consumerOutput, err := c.registerConsumer(ctx, &consumerName, streamDescribe.StreamDescription.StreamARN)
if err != nil {
return err
}
defer c.deregister(streamDescribe.StreamDescription.StreamARN, &consumerName)
consumer := consumerOutput.Consumer
//////////
//
err = c.waitConsumerBeingActive(ctx, consumer.ConsumerARN, streamDescribe.StreamDescription.StreamARN)
if err != nil {
return errors.Wrap(err, "wait consumer being active error")
}
/////////////
shardsOutput, err := c.svc.ListShardsWithContext(ctx, &kinesis.ListShardsInput{
StreamName: streamDescribe.StreamDescription.StreamName,
})
if err != nil {
return errors.Wrap(err, "list shards error")
}
var wg sync.WaitGroup
for _, shard := range shardsOutput.Shards {
wg.Add(1)
go func(ctx context.Context, shardId *string) {
defer wg.Done()
SubscribeLoop:
for {
subscribeStart := time.Now()
typ_ := kinesis.ShardFilterTypeAtTimestamp
subscribeOutput, err := c.svc.SubscribeToShardWithContext(ctx, &kinesis.SubscribeToShardInput{
ConsumerARN: consumer.ConsumerARN,
ShardId: shardId,
StartingPosition: &kinesis.StartingPosition{
Timestamp: &subscribeStart,
Type: &typ_,
},
})
if err != nil {
// reconnect
return
}
for {
select {
case <-ctx.Done():
return
case e, ok := <-subscribeOutput.EventStream.Events():
if !ok {
// reconnect
continue SubscribeLoop
}
if event, ok := e.(*kinesis.SubscribeToShardEvent); ok {
for _, r := range event.Records {
output <- r.Data
}
}
}
}
}
}(ctx, shard.ShardId)
}
wg.Wait()
return nil
}
func (c Consumer) registerConsumer(ctx context.Context, consumerName, streamARN *string) (*kinesis.RegisterStreamConsumerOutput, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout or canceled")
default:
output, err := c.svc.RegisterStreamConsumerWithContext(ctx, &kinesis.RegisterStreamConsumerInput{
StreamARN: streamARN,
ConsumerName: consumerName,
})
if err == nil {
return output, nil
}
if awsE, ok := err.(awserr.Error); ok {
if awsE.Code() != kinesis.ErrCodeResourceInUseException {
return nil, err
}
c.deregister(streamARN, consumerName)
time.Sleep(time.Second)
}
}
}
}
func (c Consumer) waitConsumerBeingActive(ctx context.Context, consumerARN, streamARN *string) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*15)
defer cancel()
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout or canceled")
default:
info, err := c.describeConsumer(ctx, consumerARN, streamARN)
if err != nil {
return err
}
if info != nil && info.ConsumerDescription != nil && *info.ConsumerDescription.ConsumerStatus == kinesis.ConsumerStatusActive {
return nil
}
time.Sleep(time.Second)
}
}
}
func (c Consumer) describeConsumer(ctx context.Context, consumerARN *string, streamARN *string) (*kinesis.DescribeStreamConsumerOutput, error) {
return c.svc.DescribeStreamConsumerWithContext(ctx, &kinesis.DescribeStreamConsumerInput{
ConsumerARN: consumerARN,
StreamARN: streamARN,
})
}
func (c Consumer) deregister(streamARN *string, consumerName *string) error {
_, err := c.svc.DeregisterStreamConsumer(&kinesis.DeregisterStreamConsumerInput{
StreamARN: streamARN,
ConsumerName: consumerName,
})
return err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment