Skip to content

Instantly share code, notes, and snippets.

@graphaelli
Created September 30, 2022 21:10
Show Gist options
  • Save graphaelli/813f668213ac8c1c2ffd3f66d77a87b9 to your computer and use it in GitHub Desktop.
Save graphaelli/813f668213ac8c1c2ffd3f66d77a87b9 to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"strings"
esv8 "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
type response struct {
PITID string `json:"pit_id"`
Hits struct {
Hits []json.RawMessage `json:"hits"`
} `json:"hits"`
}
func main() {
log.Default().SetFlags(log.Ldate | log.Ltime | log.Llongfile)
index := flag.String("index", "", "Elasticsearch Index")
pages := flag.Int("p", 0, "search result pages. 0 or 1 return the first page, >0 creates a PIT")
query := flag.String("q", "", "Elasticsearch query")
trackHits := flag.Bool("track-hits", false, "Elasticsearch Track Total Hits")
filterPath := flag.String("f", "", "set filter_path. be sure to include pit_id if paginating")
flag.Parse()
if *pages < 0 {
fmt.Fprintf(os.Stderr, "page count must be >= 0")
os.Exit(1)
}
var esConfig esv8.Config
esConfig.APIKey = os.Getenv("ELASTICSEARCH_API_KEY")
es, err := esv8.NewClient(esConfig)
if err != nil {
log.Fatal(err)
}
var body io.Reader
if *query == "-" {
body = os.Stdin
} else if *query != "" {
body = strings.NewReader(*query)
}
ctx := context.Background()
var paginationQuery map[string]interface{}
if *pages > 0 {
if body == nil {
log.Fatal("no query to paginate over")
}
if *index == "" {
*index = "*"
}
rsp, err := es.OpenPointInTime(
strings.Split(*index, ","),
"1m",
es.OpenPointInTime.WithContext(ctx))
if err != nil {
log.Fatal("while creating PIT: ", err)
}
if rsp.IsError() {
log.Fatal("while creating PIT: ", rsp.String())
}
var pit struct {
ID string `json:"id"`
}
if err := json.NewDecoder(rsp.Body).Decode(&pit); err != nil {
log.Fatal("while parsing PIT response: ", err)
}
defer func() {
if _, err := es.ClosePointInTime(
es.ClosePointInTime.WithContext(ctx),
es.ClosePointInTime.WithBody(strings.NewReader(`{"id":"`+pit.ID+`"}`)),
); err != nil {
log.Println("failed to close PIT: ", err)
}
}()
var q map[string]interface{}
if err := json.NewDecoder(body).Decode(&q); err != nil {
log.Fatal("while parsing query: ", err)
}
if _, ok := q["sort"]; !ok {
log.Print("missing sort in query")
q["sort"] = "_shard_doc"
}
q["pit"] = map[string]interface{}{"id": pit.ID, "keep_alive": "1m"}
paginationQuery = q
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&q); err != nil {
log.Fatal("while generating query: ", err)
}
body = &buf
}
searchOptions := []func(*esapi.SearchRequest){
es.Search.WithContext(ctx),
es.Search.WithTrackTotalHits(*trackHits),
}
if body != nil {
searchOptions = append(searchOptions, es.Search.WithBody(body))
}
if *index != "" && *pages == 0 {
searchOptions = append(searchOptions, es.Search.WithIndex(*index))
}
if *filterPath != "" {
searchOptions = append(searchOptions, es.Search.WithFilterPath(*filterPath))
}
rsp, err := search(es, searchOptions)
if err != nil {
log.Fatal(err)
}
fmt.Println(`{"hits":{"hits":[`)
io.Copy(os.Stdout, bytes.NewReader(rsp.Hits.Hits[0]))
if len(rsp.Hits.Hits) > 1 {
dumpHits(rsp.Hits.Hits[1:])
}
for p := 1; p < *pages; p++ {
var r struct {
Sort []interface{} `json:"sort"`
}
hitCount := len(rsp.Hits.Hits)
if hitCount == 0 {
break
}
if err := json.NewDecoder(bytes.NewReader(rsp.Hits.Hits[hitCount-1])).Decode(&r); err != nil {
log.Fatal("while decoding sort from search response: ", err)
}
paginationQuery["search_after"] = r.Sort
if oldPIT := paginationQuery["pit"].(map[string]interface{})["id"]; oldPIT != rsp.PITID {
log.Printf("PIT changed, old: %s, new: %s", oldPIT, rsp.PITID)
}
paginationQuery["pit"] = map[string]interface{}{"id": rsp.PITID, "keep_alive": "1m"}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&paginationQuery); err != nil {
log.Fatal("while generating query: ", err)
}
searchOptions = []func(*esapi.SearchRequest){
es.Search.WithContext(ctx),
es.Search.WithTrackTotalHits(*trackHits),
es.Search.WithBody(&buf),
}
if *filterPath != "" {
searchOptions = append(searchOptions, es.Search.WithFilterPath(*filterPath))
}
rsp, err = search(es, searchOptions)
if err != nil {
log.Fatal(err)
}
dumpHits(rsp.Hits.Hits)
}
fmt.Println("]}}")
}
// search is for convenience
func search(es *esv8.Client, opts []func(*esapi.SearchRequest)) (*response, error) {
res, err := es.Search(opts...)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.IsError() {
return nil, errors.New(res.String())
}
var rsp response
if err := json.NewDecoder(res.Body).Decode(&rsp); err != nil {
return nil, err
}
return &rsp, nil
}
func dumpHits(hits []json.RawMessage) {
for _, hit := range hits {
fmt.Println(",")
io.Copy(os.Stdout, bytes.NewReader(hit))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment