Skip to content

Instantly share code, notes, and snippets.

@Shubhamnegi
Created February 3, 2020 13:57
Show Gist options
  • Save Shubhamnegi/7873ed539b93e97ddd53f8ef7cd61f8a to your computer and use it in GitHub Desktop.
Save Shubhamnegi/7873ed539b93e97ddd53f8ef7cd61f8a to your computer and use it in GitHub Desktop.
To restore messages from elasticsearch to go
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"io/ioutil"
"net/http"
"strconv"
)
// variables
var (
scrollID string = ""
esHost string = ""
esindex string = ""
sqsClient *sqs.SQS
destQueueURL *string
)
func createSession(profile *string, region *string) (*session.Session, error) {
sess, err := session.NewSessionWithOptions(session.Options{
Profile: *profile,
Config: aws.Config{
Region: aws.String(*region),
},
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
fmt.Println(err)
return nil, err
}
return sess, nil
}
func createSqsClient(profile *string, region *string) {
sess, err := createSession(profile, region)
if err != nil {
panic("error creating sqs client")
}
sqsClient = sqs.New(sess)
}
// functions
func getQueueMessageFromES(instance *string) map[string]interface{} {
if scrollID != "" {
fmt.Println("scrolling")
return scrollEs()
}
fmt.Println("search")
return searchEs(instance)
}
func getEsHost() string {
return esHost + esindex
}
func getSeachURL() string {
return getEsHost() + "/_search?scroll=1m"
}
func getQuery(instance *string) []byte {
request := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []map[string]interface{}{
{
"term": map[string]string{
"instance": *instance,
},
},
},
},
},
"size": 100,
}
json, err := json.Marshal(request)
if err != nil {
panic("Error occured parsing")
}
return json
}
// To query elasticsearh. Store scroll id from here
func searchEs(instance *string) map[string]interface{} {
url := getSeachURL()
client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(getQuery(instance)))
if err != nil {
panic("Error occured creating request")
}
req.Header.Add("content-type", "application/json")
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
bodyString := string(bodyBytes)
if resp.StatusCode != http.StatusOK {
panic("error occured with index:" + bodyString)
}
responseMap := make(map[string]interface{})
err = json.Unmarshal([]byte(bodyString), &responseMap)
if err != nil {
panic(err)
}
storeScrollID(&responseMap)
totalHits := responseMap["hits"].(map[string]interface{})["total"].(float64)
fmt.Println("Total hits:", totalHits)
return responseMap
}
func storeScrollID(esResopnse *map[string]interface{}) string {
response := *esResopnse
scrollID = response["_scroll_id"].(string)
fmt.Println("scrollID: ", scrollID)
return scrollID
}
// To scroll elasticsearch
func scrollEs() map[string]interface{} {
url := esHost + "_search/scroll"
request := map[string]string{
"scroll": "1m",
"scroll_id": scrollID,
}
requestJSON, err := json.Marshal(request)
if err != nil {
fmt.Println(err)
}
reqst, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(requestJSON))
if err != nil {
panic("Error creating request")
}
client := &http.Client{}
reqst.Header.Add("content-type", "application/json")
resp, err := client.Do(reqst)
if err != nil {
panic("Error scrolling es")
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
bodyString := string(bodyBytes)
if resp.StatusCode != http.StatusOK {
panic("error occured with index: " + bodyString)
}
responseMap := make(map[string]interface{})
err = json.Unmarshal([]byte(bodyString), &responseMap)
if err != nil {
panic(err)
}
storeScrollID(&responseMap)
return responseMap
}
// check index exists in elasticearch
func processEsResult(esResult *map[string]interface{}) bool {
result := *esResult
hits := result["hits"].(map[string]interface{})["hits"].([]interface{})
if len(hits) == 0 {
return false
}
for _, v := range hits {
_source := v.(map[string]interface{})["_source"]
_id := v.(map[string]interface{})["_id"].(string)
fmt.Println("processing id:" + _id)
body := _source.(map[string]interface{})["body"].(string)
_, err := sendMessageToSQS(&body) // To send message to sqs
if err != nil {
fmt.Println(err)
panic("Error pushing to queue")
}
fmt.Println("Message pushed")
deleteMessageEs(_id) // on success delete message from sqs
}
return true
}
func deleteMessageEs(id string) {
url := getEsHost() + "/" + "message/" + id
request, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
fmt.Print("Error creating delete request", id)
}
client := &http.Client{}
fmt.Println("delete message with id:" + id)
res, err := client.Do(request)
if err != nil {
fmt.Println("Error deleting message with id:", id, err)
panic("error deleting message")
}
fmt.Println("Delete status: " + strconv.Itoa(res.StatusCode))
if res.StatusCode != http.StatusOK {
fmt.Print("!200 http status recieved id:", id)
}
}
func sendMessageToSQS(body *string) (*sqs.SendMessageOutput, error) {
result, error := sqsClient.SendMessage(&sqs.SendMessageInput{MessageBody: body, QueueUrl: destQueueURL})
if error != nil {
return nil, error
}
return result, nil
}
// main function
func main() {
fmt.Println("Welcome to sqs restore")
dest := flag.String("dest", "", "SQS URL for recovery")
profile := flag.String("profile", "default", "AWS profile to use")
region := flag.String("region", "", "AWS profile to use")
host := flag.String("host", "", "Elasticsearch host")
index := flag.String("index", "sqs-archive", "Index for sqs archive")
instance := flag.String("instance", "", "Instance id to pull messages for")
flag.Parse()
fmt.Println(*host, *index, *instance)
if *host == "" || *index == "" || *dest == "" || *instance == "" {
panic("Missing required parameters")
}
esHost = *host
esindex = *index
destQueueURL = dest
createSqsClient(profile, region)
for 1 == 1 {
result := getQueueMessageFromES(instance)
haveHits := processEsResult(&result)
if haveHits == false {
fmt.Println("0 hits found quiting")
return
}
}
}
@Shubhamnegi
Copy link
Author

To restore messages

sqsRestore --host="http://es_host" --index="sqs-archive" --instance="filter_by_instance" --dest="http://aws/queue_url" --profile="default"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment