Skip to content

Instantly share code, notes, and snippets.

@p4tin
Created August 16, 2018 16:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save p4tin/6028168f06a4eb1b16527de26168f429 to your computer and use it in GitHub Desktop.
Save p4tin/6028168f06a4eb1b16527de26168f429 to your computer and use it in GitHub Desktop.
Copy messages from one AWS Queue to another
package main
import (
"flag"
"fmt"
"log"
"os"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
func main() {
src := flag.String("src", "", "source queue")
dest := flag.String("dest", "", "destination queue")
flag.Parse()
if *src == "" || *dest == "" {
flag.Usage()
os.Exit(1)
}
if os.Getenv("AWS_REGION") == "" {
fmt.Printf("AWS_REGION not set")
os.Exit(1)
}
region := os.Getenv("AWS_REGION")
log.Printf("source queue : %v", *src)
log.Printf("destination queue : %v", *dest)
config := &aws.Config{
Region: &region,
}
client := sqs.New(session.New(), config)
maxMessages := int64(10)
waitTime := int64(0)
messageAttributeNames := aws.StringSlice([]string{"All"})
rmin := &sqs.ReceiveMessageInput{
QueueUrl: src,
MaxNumberOfMessages: &maxMessages,
WaitTimeSeconds: &waitTime,
MessageAttributeNames: messageAttributeNames,
}
// loop as long as there are messages on the queue
for {
resp, err := client.ReceiveMessage(rmin)
if err != nil {
panic(err)
}
if len(resp.Messages) == 0 {
log.Printf("done")
return
}
log.Printf("received %v messages...", len(resp.Messages))
var wg sync.WaitGroup
wg.Add(len(resp.Messages))
for _, m := range resp.Messages {
go func(m *sqs.Message) {
defer wg.Done()
// write the message to the destination queue
smi := sqs.SendMessageInput{
MessageAttributes: m.MessageAttributes,
MessageBody: m.Body,
QueueUrl: dest,
}
_, err := client.SendMessage(&smi)
if err != nil {
log.Printf("ERROR sending message to destination %v", err)
return
}
// message was sent, dequeue from source queue
dmi := &sqs.DeleteMessageInput{
QueueUrl: src,
ReceiptHandle: m.ReceiptHandle,
}
if _, err := client.DeleteMessage(dmi); err != nil {
log.Printf("ERROR dequeueing message ID %v : %v",
*m.ReceiptHandle,
err)
}
}(m)
}
// wait for all jobs from this batch...
wg.Wait()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment