Skip to content

Instantly share code, notes, and snippets.

@Shubhamnegi
Last active January 31, 2020 07:34
Show Gist options
  • Save Shubhamnegi/50ea0e5d064b3a2f1555cb47d70108f8 to your computer and use it in GitHub Desktop.
Save Shubhamnegi/50ea0e5d064b3a2f1555cb47d70108f8 to your computer and use it in GitHub Desktop.
Transfer/archive message from source SQS to dest SQS or elasticsearch
package main
import (
"fmt"
"flag"
"bytes"
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"io/ioutil"
"net/http"
"strconv"
"time"
)
var region string = "ap-southeast-1"
var localSession *session.Session
var sqsClient *sqs.SQS
var profile string = "default" // Hard coded
var sourceQueueURL string = ""
var destQueueURL string = ""
var esIndex string = "sqs-archive"
var esHost string = ""
var instance string = ""
func createSession() {
sess, err := session.NewSessionWithOptions(session.Options{
Profile: profile,
Config: aws.Config{
Region: aws.String(region),
},
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
fmt.Println(err)
return
}
localSession = sess
}
func createSqsClient() {
sqsClient = sqs.New(localSession)
}
func deleteMessage(queue *string, messageID *string) *sqs.DeleteMessageOutput {
fmt.Println("deleting message from %v", *queue)
result, error := sqsClient.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: queue, ReceiptHandle: messageID})
if error != nil {
fmt.Println(error)
}
return result
}
func processMessage(body *string, messageID *string, archive *bool) {
fmt.Println(*body, " : processig message")
// If arhive is enabled then dont need to push to main queue
if *archive == false {
resultPush := pushMessage(body, destQueueURL)
fmt.Println(*resultPush.MessageId, "push")
} else {
// Store message
archiveMessage(body)
}
deleteResult := deleteMessage(&sourceQueueURL, messageID)
fmt.Println(deleteResult, "delete")
}
func pushMessage(body *string, destQueueURL string) *sqs.SendMessageOutput {
fmt.Println("pushing new message to:", destQueueURL)
result, error := sqsClient.SendMessage(&sqs.SendMessageInput{MessageBody: body, QueueUrl: &destQueueURL})
if error != nil {
fmt.Println(error)
}
return result
}
func recieveMessage(numberOfmessage int64, sourceQueueURL string) []*sqs.Message {
fmt.Println("Polling for message")
var waitTime int64 = 20
result, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(numberOfmessage),
QueueUrl: &sourceQueueURL,
WaitTimeSeconds: aws.Int64(waitTime),
})
if err != nil {
fmt.Println(err)
}
fmt.Println(len(result.Messages), ": Length of message")
return result.Messages
}
func getEsURL() string {
return esHost + esIndex
}
func checkElastiSearchIndex() {
client := &http.Client{}
resp, err := client.Head(getEsURL())
if err != nil {
panic("Error checking elasticsearch index")
}
if resp.StatusCode == http.StatusOK {
fmt.Println("Index %v already exists", esIndex)
} else {
fmt.Println("Index doesn't exists, creating new index")
createElasticSearchIndex()
}
}
func createElasticSearchIndex() {
requestBody := map[string]interface{}{
"settings": map[string]interface{}{
"index": map[string]interface{}{
"number_of_shards": 2,
"number_of_replicas": 1,
},
},
"mappings": map[string]interface{}{
"message": map[string]interface{}{
"properties": map[string]interface{}{
"queue": map[string]interface{}{
"type": "text",
},
"body": map[string]interface{}{
"type": "text",
"index": false,
},
"instance": map[string]interface{}{
"type": "keyword",
},
},
},
},
}
json, err := json.Marshal(requestBody)
if err != nil {
panic("Error occured parsing")
}
client := &http.Client{}
req, err := http.NewRequest(http.MethodPut, getEsURL(), bytes.NewBuffer(json))
if err != nil {
panic("Error occured creating request")
}
req.Header.Add("content-type", "application/json")
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
fmt.Println(resp.StatusCode)
if resp.StatusCode != http.StatusOK {
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
}
fmt.Print(string(bodyBytes))
}
}
func archiveMessage(body *string) {
requestBody := map[string]interface{}{
"body": body,
"instance": instance,
"queue": sourceQueueURL,
}
json, err := json.Marshal(requestBody)
if err != nil {
panic(err)
}
req, err := http.NewRequest(http.MethodPost, getEsURL()+"/message", bytes.NewBuffer(json))
if err != nil {
panic(err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
fmt.Println(resp.StatusCode)
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
}
fmt.Print(string(bodyBytes))
}
func main() {
source := flag.String("source", "", "Name of the source queue") // Source queue refrence
dest := flag.String("dest", "", "Name of the destination queue") // Destintion queueue refrence
archive := flag.Bool("archive", false, "To archive message to s3, will ignore destination") // will archive messages to s3
index := flag.String("index", "sqs-archive", "Index value for elasticsearch") // index for elasticsearch
host := flag.String("host", "", "Host value for elasticsearch") // Host for elasticsearch
flag.Parse()
sourceQueueURL = *source
destQueueURL = *dest
if sourceQueueURL == "" {
panic("Missing source url")
}
// If archive is enabled destination queue is not required
// Since the data will stored in some datasource
if *archive == false && destQueueURL == "" {
panic("Invalid parameters, Requested param source,dest")
}
if *archive == true {
if esHost == "" {
panic("Missing eshost")
}
if destQueueURL != "" {
fmt.Println("Ignoring destination queue")
}
esIndex = *index
esHost = *host
checkElastiSearchIndex()
now := time.Now()
nanos := now.UnixNano()
instance = strconv.FormatInt(nanos, 10)
fmt.Println("Query for archived instance: ", esHost+esIndex+"/_search?q=instance:"+instance)
}
fmt.Println("Create session")
createSession()
fmt.Println("Session created")
fmt.Println("Create SQS client")
createSqsClient()
fmt.Println("SQS client created")
fmt.Println("Recieve message")
for 1 == 1 {
messages := recieveMessage(10, sourceQueueURL)
for i := 0; i < len(messages); i++ {
processMessage(messages[i].Body, messages[i].ReceiptHandle, archive)
}
if len(messages) == 0 {
return
}
}
}
@Shubhamnegi
Copy link
Author

Shubhamnegi commented Nov 20, 2019

To transfer messages
Usage: sqstransfer -source="queueUrl" -dest="queueUrl"

@Shubhamnegi
Copy link
Author

Shubhamnegi commented Jan 31, 2020

To archive messages
sqstransfer --source="queue_url" --archive="true" --host="es_host"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment