Skip to content

Instantly share code, notes, and snippets.

@michaljemala
Created March 15, 2018 11:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michaljemala/200e072d881349b2b143beffcbdf8f91 to your computer and use it in GitHub Desktop.
Save michaljemala/200e072d881349b2b143beffcbdf8f91 to your computer and use it in GitHub Desktop.
AWS - Subscribe queue to a topic
package main
import (
"flag"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sqs"
)
var (
topicArn = flag.String("topic", "", "topic ARN to subscribe to")
accessKey = flag.String("accesskey", "", "access key")
secretKey = flag.String("secretkey", "", "secret key")
message = flag.String("message", "", "a test message to content to publish to topic and read from queue")
)
func main() {
flag.Parse()
if *topicArn == "" || *accessKey == "" || *secretKey == "" || *message == "" {
flag.Usage()
os.Exit(1)
}
session := session.Must(session.NewSession(
&aws.Config{Credentials: credentials.NewStaticCredentials(
*accessKey,
*secretKey,
"",
)},
&aws.Config{Region: aws.String(endpoints.UsEast1RegionID)},
))
sqsClient := sqs.New(session)
out1, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(fmt.Sprintf("queue-%d", time.Now().Unix())),
})
if err != nil {
log.Fatal(err)
}
out2, err := sqsClient.GetQueueAttributes(&sqs.GetQueueAttributesInput{
QueueUrl: out1.QueueUrl,
AttributeNames: []*string{
aws.String("QueueArn"),
},
})
if err != nil {
sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
QueueUrl: out1.QueueUrl,
})
log.Fatal(err)
}
queueArn, found := out2.Attributes["QueueArn"]
if !found {
log.Fatal("no queue ARN attribute")
}
snsClient := sns.New(session)
_, err = snsClient.Subscribe(&sns.SubscribeInput{
TopicArn: topicArn,
Protocol: aws.String("sqs"),
Endpoint: queueArn,
})
if err != nil {
sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
QueueUrl: out1.QueueUrl,
})
log.Fatal(err)
}
policy := fmt.Sprintf(`{
"Version":"2008-10-17",
"Statement":[{
"Effect":"Allow",
"Principal":"*",
"Action": ["sqs:SendMessage"],
"Resource":["%s"],
"Condition":{
"ArnLike":{"aws:SourceArn":["%s"]}
}
}]
}`, *queueArn, *topicArn)
_, err = sqsClient.SetQueueAttributes(&sqs.SetQueueAttributesInput{
QueueUrl: out1.QueueUrl,
Attributes: map[string]*string{
"Policy": aws.String(policy),
},
})
if err != nil {
sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
QueueUrl: out1.QueueUrl,
})
log.Fatal(err)
}
_, err = snsClient.Publish(&sns.PublishInput{
Message: message,
TopicArn: topicArn,
})
if err != nil {
sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
QueueUrl: out1.QueueUrl,
})
log.Fatal(err)
}
out3, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: out1.QueueUrl,
WaitTimeSeconds: aws.Int64(10),
})
if err != nil {
sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
QueueUrl: out1.QueueUrl,
})
log.Fatal(err)
}
for _, m := range out3.Messages {
if m != nil && m.Body != nil {
fmt.Println(*m.Body)
}
}
sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
QueueUrl: out1.QueueUrl,
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment