Skip to content

Instantly share code, notes, and snippets.

@eraserhd
Created July 10, 2024 19:06
Show Gist options
  • Save eraserhd/130b9817030d3e11aeb6495c66a5e354 to your computer and use it in GitHub Desktop.
Save eraserhd/130b9817030d3e11aeb6495c66a5e354 to your computer and use it in GitHub Desktop.
diff --git a/bindings/aws/sqs/sqs.go b/bindings/aws/sqs/sqs.go
index 465e061b..356062e4 100644
--- a/bindings/aws/sqs/sqs.go
+++ b/bindings/aws/sqs/sqs.go
@@ -33,13 +33,14 @@ import (
// AWSSQS allows receiving and sending data to/from AWS SQS.
type AWSSQS struct {
- Client *sqs.SQS
- QueueURL *string
-
- logger logger.Logger
- wg sync.WaitGroup
- closeCh chan struct{}
- closed atomic.Bool
+ Client *sqs.SQS
+
+ queueName string
+ queueURL atomic.Pointer[string]
+ logger logger.Logger
+ wg sync.WaitGroup
+ closeCh chan struct{}
+ closed atomic.Bool
}
type sqsMetadata struct {
@@ -65,21 +66,12 @@ func (a *AWSSQS) Init(ctx context.Context, metadata bindings.Metadata) error {
if err != nil {
return err
}
+ a.queueName = m.QueueName
client, err := a.getClient(m)
if err != nil {
return err
}
-
- queueName := m.QueueName
- resultURL, err := client.GetQueueUrlWithContext(ctx, &sqs.GetQueueUrlInput{
- QueueName: aws.String(queueName),
- })
- if err != nil {
- return err
- }
-
- a.QueueURL = resultURL.QueueUrl
a.Client = client
return nil
@@ -91,11 +83,14 @@ func (a *AWSSQS) Operations() []bindings.OperationKind {
func (a *AWSSQS) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
msgBody := string(req.Data)
- _, err := a.Client.SendMessageWithContext(ctx, &sqs.SendMessageInput{
+ url, err := a.getQueueURL(ctx)
+ if err != nil {
+ return nil, err
+ }
+ _, err = a.Client.SendMessageWithContext(ctx, &sqs.SendMessageInput{
MessageBody: &msgBody,
- QueueUrl: a.QueueURL,
+ QueueUrl: &url,
})
-
return nil, err
}
@@ -104,6 +99,11 @@ func (a *AWSSQS) Read(ctx context.Context, handler bindings.Handler) error {
return errors.New("binding is closed")
}
+ url, err := a.getQueueURL(ctx)
+ if err != nil {
+ return err
+ }
+
a.wg.Add(1)
go func() {
defer a.wg.Done()
@@ -115,7 +115,7 @@ func (a *AWSSQS) Read(ctx context.Context, handler bindings.Handler) error {
}
result, err := a.Client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
- QueueUrl: a.QueueURL,
+ QueueUrl: &url,
AttributeNames: aws.StringSlice([]string{
"SentTimestamp",
}),
@@ -126,7 +126,7 @@ func (a *AWSSQS) Read(ctx context.Context, handler bindings.Handler) error {
WaitTimeSeconds: aws.Int64(20),
})
if err != nil {
- a.logger.Errorf("Unable to receive message from queue %q, %v.", *a.QueueURL, err)
+ a.logger.Errorf("Unable to receive message from queue %q, %v.", url, err)
}
if len(result.Messages) > 0 {
@@ -141,7 +141,7 @@ func (a *AWSSQS) Read(ctx context.Context, handler bindings.Handler) error {
// Use a background context here because ctx may be canceled already
a.Client.DeleteMessageWithContext(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: a.QueueURL,
+ QueueUrl: &url,
ReceiptHandle: msgHandle,
})
}
@@ -187,6 +187,20 @@ func (a *AWSSQS) getClient(metadata *sqsMetadata) (*sqs.SQS, error) {
return c, nil
}
+func (a *AWSSQS) getQueueURL(ctx context.Context) (string, error) {
+ if url := a.queueURL.Load(); url != nil {
+ return *url, nil
+ }
+ resultURL, err := a.Client.GetQueueUrlWithContext(ctx, &sqs.GetQueueUrlInput{
+ QueueName: aws.String(a.queueName),
+ })
+ if err != nil {
+ return "", err
+ }
+ a.queueURL.CompareAndSwap(nil, resultURL.QueueUrl)
+ return *resultURL.QueueUrl, nil
+}
+
// GetComponentMetadata returns the metadata of the component.
func (a *AWSSQS) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
metadataStruct := sqsMetadata{}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment