Created
February 3, 2020 13:57
-
-
Save Shubhamnegi/7873ed539b93e97ddd53f8ef7cd61f8a to your computer and use it in GitHub Desktop.
To restore messages from elasticsearch to go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/sqs" | |
"io/ioutil" | |
"net/http" | |
"strconv" | |
) | |
// variables | |
var ( | |
scrollID string = "" | |
esHost string = "" | |
esindex string = "" | |
sqsClient *sqs.SQS | |
destQueueURL *string | |
) | |
func createSession(profile *string, region *string) (*session.Session, error) { | |
sess, err := session.NewSessionWithOptions(session.Options{ | |
Profile: *profile, | |
Config: aws.Config{ | |
Region: aws.String(*region), | |
}, | |
SharedConfigState: session.SharedConfigEnable, | |
}) | |
if err != nil { | |
fmt.Println(err) | |
return nil, err | |
} | |
return sess, nil | |
} | |
func createSqsClient(profile *string, region *string) { | |
sess, err := createSession(profile, region) | |
if err != nil { | |
panic("error creating sqs client") | |
} | |
sqsClient = sqs.New(sess) | |
} | |
// functions | |
func getQueueMessageFromES(instance *string) map[string]interface{} { | |
if scrollID != "" { | |
fmt.Println("scrolling") | |
return scrollEs() | |
} | |
fmt.Println("search") | |
return searchEs(instance) | |
} | |
func getEsHost() string { | |
return esHost + esindex | |
} | |
func getSeachURL() string { | |
return getEsHost() + "/_search?scroll=1m" | |
} | |
func getQuery(instance *string) []byte { | |
request := map[string]interface{}{ | |
"query": map[string]interface{}{ | |
"bool": map[string]interface{}{ | |
"filter": []map[string]interface{}{ | |
{ | |
"term": map[string]string{ | |
"instance": *instance, | |
}, | |
}, | |
}, | |
}, | |
}, | |
"size": 100, | |
} | |
json, err := json.Marshal(request) | |
if err != nil { | |
panic("Error occured parsing") | |
} | |
return json | |
} | |
// To query elasticsearh. Store scroll id from here | |
func searchEs(instance *string) map[string]interface{} { | |
url := getSeachURL() | |
client := &http.Client{} | |
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(getQuery(instance))) | |
if err != nil { | |
panic("Error occured creating request") | |
} | |
req.Header.Add("content-type", "application/json") | |
resp, err := client.Do(req) | |
if err != nil { | |
fmt.Println(err) | |
} | |
defer resp.Body.Close() | |
bodyBytes, err := ioutil.ReadAll(resp.Body) | |
bodyString := string(bodyBytes) | |
if resp.StatusCode != http.StatusOK { | |
panic("error occured with index:" + bodyString) | |
} | |
responseMap := make(map[string]interface{}) | |
err = json.Unmarshal([]byte(bodyString), &responseMap) | |
if err != nil { | |
panic(err) | |
} | |
storeScrollID(&responseMap) | |
totalHits := responseMap["hits"].(map[string]interface{})["total"].(float64) | |
fmt.Println("Total hits:", totalHits) | |
return responseMap | |
} | |
func storeScrollID(esResopnse *map[string]interface{}) string { | |
response := *esResopnse | |
scrollID = response["_scroll_id"].(string) | |
fmt.Println("scrollID: ", scrollID) | |
return scrollID | |
} | |
// To scroll elasticsearch | |
func scrollEs() map[string]interface{} { | |
url := esHost + "_search/scroll" | |
request := map[string]string{ | |
"scroll": "1m", | |
"scroll_id": scrollID, | |
} | |
requestJSON, err := json.Marshal(request) | |
if err != nil { | |
fmt.Println(err) | |
} | |
reqst, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(requestJSON)) | |
if err != nil { | |
panic("Error creating request") | |
} | |
client := &http.Client{} | |
reqst.Header.Add("content-type", "application/json") | |
resp, err := client.Do(reqst) | |
if err != nil { | |
panic("Error scrolling es") | |
} | |
defer resp.Body.Close() | |
bodyBytes, err := ioutil.ReadAll(resp.Body) | |
bodyString := string(bodyBytes) | |
if resp.StatusCode != http.StatusOK { | |
panic("error occured with index: " + bodyString) | |
} | |
responseMap := make(map[string]interface{}) | |
err = json.Unmarshal([]byte(bodyString), &responseMap) | |
if err != nil { | |
panic(err) | |
} | |
storeScrollID(&responseMap) | |
return responseMap | |
} | |
// check index exists in elasticearch | |
func processEsResult(esResult *map[string]interface{}) bool { | |
result := *esResult | |
hits := result["hits"].(map[string]interface{})["hits"].([]interface{}) | |
if len(hits) == 0 { | |
return false | |
} | |
for _, v := range hits { | |
_source := v.(map[string]interface{})["_source"] | |
_id := v.(map[string]interface{})["_id"].(string) | |
fmt.Println("processing id:" + _id) | |
body := _source.(map[string]interface{})["body"].(string) | |
_, err := sendMessageToSQS(&body) // To send message to sqs | |
if err != nil { | |
fmt.Println(err) | |
panic("Error pushing to queue") | |
} | |
fmt.Println("Message pushed") | |
deleteMessageEs(_id) // on success delete message from sqs | |
} | |
return true | |
} | |
func deleteMessageEs(id string) { | |
url := getEsHost() + "/" + "message/" + id | |
request, err := http.NewRequest(http.MethodDelete, url, nil) | |
if err != nil { | |
fmt.Print("Error creating delete request", id) | |
} | |
client := &http.Client{} | |
fmt.Println("delete message with id:" + id) | |
res, err := client.Do(request) | |
if err != nil { | |
fmt.Println("Error deleting message with id:", id, err) | |
panic("error deleting message") | |
} | |
fmt.Println("Delete status: " + strconv.Itoa(res.StatusCode)) | |
if res.StatusCode != http.StatusOK { | |
fmt.Print("!200 http status recieved id:", id) | |
} | |
} | |
func sendMessageToSQS(body *string) (*sqs.SendMessageOutput, error) { | |
result, error := sqsClient.SendMessage(&sqs.SendMessageInput{MessageBody: body, QueueUrl: destQueueURL}) | |
if error != nil { | |
return nil, error | |
} | |
return result, nil | |
} | |
// main function | |
func main() { | |
fmt.Println("Welcome to sqs restore") | |
dest := flag.String("dest", "", "SQS URL for recovery") | |
profile := flag.String("profile", "default", "AWS profile to use") | |
region := flag.String("region", "", "AWS profile to use") | |
host := flag.String("host", "", "Elasticsearch host") | |
index := flag.String("index", "sqs-archive", "Index for sqs archive") | |
instance := flag.String("instance", "", "Instance id to pull messages for") | |
flag.Parse() | |
fmt.Println(*host, *index, *instance) | |
if *host == "" || *index == "" || *dest == "" || *instance == "" { | |
panic("Missing required parameters") | |
} | |
esHost = *host | |
esindex = *index | |
destQueueURL = dest | |
createSqsClient(profile, region) | |
for 1 == 1 { | |
result := getQueueMessageFromES(instance) | |
haveHits := processEsResult(&result) | |
if haveHits == false { | |
fmt.Println("0 hits found quiting") | |
return | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To restore messages
sqsRestore --host="http://es_host" --index="sqs-archive" --instance="filter_by_instance" --dest="http://aws/queue_url" --profile="default"