Last active
January 31, 2020 07:34
-
-
Save Shubhamnegi/50ea0e5d064b3a2f1555cb47d70108f8 to your computer and use it in GitHub Desktop.
Transfer/archive message from source SQS to dest SQS or elasticsearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"flag" | |
"bytes" | |
"encoding/json" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/sqs" | |
"io/ioutil" | |
"net/http" | |
"strconv" | |
"time" | |
) | |
var region string = "ap-southeast-1" | |
var localSession *session.Session | |
var sqsClient *sqs.SQS | |
var profile string = "default" // Hard coded | |
var sourceQueueURL string = "" | |
var destQueueURL string = "" | |
var esIndex string = "sqs-archive" | |
var esHost string = "" | |
var instance string = "" | |
func createSession() { | |
sess, err := session.NewSessionWithOptions(session.Options{ | |
Profile: profile, | |
Config: aws.Config{ | |
Region: aws.String(region), | |
}, | |
SharedConfigState: session.SharedConfigEnable, | |
}) | |
if err != nil { | |
fmt.Println(err) | |
return | |
} | |
localSession = sess | |
} | |
func createSqsClient() { | |
sqsClient = sqs.New(localSession) | |
} | |
func deleteMessage(queue *string, messageID *string) *sqs.DeleteMessageOutput { | |
fmt.Println("deleting message from %v", *queue) | |
result, error := sqsClient.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: queue, ReceiptHandle: messageID}) | |
if error != nil { | |
fmt.Println(error) | |
} | |
return result | |
} | |
func processMessage(body *string, messageID *string, archive *bool) { | |
fmt.Println(*body, " : processig message") | |
// If arhive is enabled then dont need to push to main queue | |
if *archive == false { | |
resultPush := pushMessage(body, destQueueURL) | |
fmt.Println(*resultPush.MessageId, "push") | |
} else { | |
// Store message | |
archiveMessage(body) | |
} | |
deleteResult := deleteMessage(&sourceQueueURL, messageID) | |
fmt.Println(deleteResult, "delete") | |
} | |
func pushMessage(body *string, destQueueURL string) *sqs.SendMessageOutput { | |
fmt.Println("pushing new message to:", destQueueURL) | |
result, error := sqsClient.SendMessage(&sqs.SendMessageInput{MessageBody: body, QueueUrl: &destQueueURL}) | |
if error != nil { | |
fmt.Println(error) | |
} | |
return result | |
} | |
func recieveMessage(numberOfmessage int64, sourceQueueURL string) []*sqs.Message { | |
fmt.Println("Polling for message") | |
var waitTime int64 = 20 | |
result, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{ | |
MaxNumberOfMessages: aws.Int64(numberOfmessage), | |
QueueUrl: &sourceQueueURL, | |
WaitTimeSeconds: aws.Int64(waitTime), | |
}) | |
if err != nil { | |
fmt.Println(err) | |
} | |
fmt.Println(len(result.Messages), ": Length of message") | |
return result.Messages | |
} | |
func getEsURL() string { | |
return esHost + esIndex | |
} | |
func checkElastiSearchIndex() { | |
client := &http.Client{} | |
resp, err := client.Head(getEsURL()) | |
if err != nil { | |
panic("Error checking elasticsearch index") | |
} | |
if resp.StatusCode == http.StatusOK { | |
fmt.Println("Index %v already exists", esIndex) | |
} else { | |
fmt.Println("Index doesn't exists, creating new index") | |
createElasticSearchIndex() | |
} | |
} | |
func createElasticSearchIndex() { | |
requestBody := map[string]interface{}{ | |
"settings": map[string]interface{}{ | |
"index": map[string]interface{}{ | |
"number_of_shards": 2, | |
"number_of_replicas": 1, | |
}, | |
}, | |
"mappings": map[string]interface{}{ | |
"message": map[string]interface{}{ | |
"properties": map[string]interface{}{ | |
"queue": map[string]interface{}{ | |
"type": "text", | |
}, | |
"body": map[string]interface{}{ | |
"type": "text", | |
"index": false, | |
}, | |
"instance": map[string]interface{}{ | |
"type": "keyword", | |
}, | |
}, | |
}, | |
}, | |
} | |
json, err := json.Marshal(requestBody) | |
if err != nil { | |
panic("Error occured parsing") | |
} | |
client := &http.Client{} | |
req, err := http.NewRequest(http.MethodPut, getEsURL(), bytes.NewBuffer(json)) | |
if err != nil { | |
panic("Error occured creating request") | |
} | |
req.Header.Add("content-type", "application/json") | |
resp, err := client.Do(req) | |
if err != nil { | |
fmt.Println(err) | |
} | |
defer resp.Body.Close() | |
fmt.Println(resp.StatusCode) | |
if resp.StatusCode != http.StatusOK { | |
bodyBytes, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
fmt.Println(err) | |
} | |
fmt.Print(string(bodyBytes)) | |
} | |
} | |
func archiveMessage(body *string) { | |
requestBody := map[string]interface{}{ | |
"body": body, | |
"instance": instance, | |
"queue": sourceQueueURL, | |
} | |
json, err := json.Marshal(requestBody) | |
if err != nil { | |
panic(err) | |
} | |
req, err := http.NewRequest(http.MethodPost, getEsURL()+"/message", bytes.NewBuffer(json)) | |
if err != nil { | |
panic(err) | |
} | |
req.Header.Set("Content-Type", "application/json") | |
client := &http.Client{} | |
resp, err := client.Do(req) | |
if err != nil { | |
panic(err) | |
} | |
defer resp.Body.Close() | |
fmt.Println(resp.StatusCode) | |
bodyBytes, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
fmt.Println(err) | |
} | |
fmt.Print(string(bodyBytes)) | |
} | |
func main() { | |
source := flag.String("source", "", "Name of the source queue") // Source queue refrence | |
dest := flag.String("dest", "", "Name of the destination queue") // Destintion queueue refrence | |
archive := flag.Bool("archive", false, "To archive message to s3, will ignore destination") // will archive messages to s3 | |
index := flag.String("index", "sqs-archive", "Index value for elasticsearch") // index for elasticsearch | |
host := flag.String("host", "", "Host value for elasticsearch") // Host for elasticsearch | |
flag.Parse() | |
sourceQueueURL = *source | |
destQueueURL = *dest | |
if sourceQueueURL == "" { | |
panic("Missing source url") | |
} | |
// If archive is enabled destination queue is not required | |
// Since the data will stored in some datasource | |
if *archive == false && destQueueURL == "" { | |
panic("Invalid parameters, Requested param source,dest") | |
} | |
if *archive == true { | |
if esHost == "" { | |
panic("Missing eshost") | |
} | |
if destQueueURL != "" { | |
fmt.Println("Ignoring destination queue") | |
} | |
esIndex = *index | |
esHost = *host | |
checkElastiSearchIndex() | |
now := time.Now() | |
nanos := now.UnixNano() | |
instance = strconv.FormatInt(nanos, 10) | |
fmt.Println("Query for archived instance: ", esHost+esIndex+"/_search?q=instance:"+instance) | |
} | |
fmt.Println("Create session") | |
createSession() | |
fmt.Println("Session created") | |
fmt.Println("Create SQS client") | |
createSqsClient() | |
fmt.Println("SQS client created") | |
fmt.Println("Recieve message") | |
for 1 == 1 { | |
messages := recieveMessage(10, sourceQueueURL) | |
for i := 0; i < len(messages); i++ { | |
processMessage(messages[i].Body, messages[i].ReceiptHandle, archive) | |
} | |
if len(messages) == 0 { | |
return | |
} | |
} | |
} |
To transfer messages
Usage: sqstransfer -source="queueUrl" -dest="queueUrl"
To archive messages
sqstransfer --source="queue_url" --archive="true" --host="es_host"
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
env GOOS=linux GOARCH=amd64 GOARM=7 go build name_of_project