Skip to content

Instantly share code, notes, and snippets.

@graphaelli
Created September 23, 2022 19:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save graphaelli/f7f20be22543bc244a773fb27e48f612 to your computer and use it in GitHub Desktop.
Save graphaelli/f7f20be22543bc244a773fb27e48f612 to your computer and use it in GitHub Desktop.
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
)
type response struct {
Hits struct {
Hits []json.RawMessage `json:"hits"`
} `json:"hits"`
}
func main() {
d := json.NewDecoder(os.Stdin)
e := json.NewEncoder(os.Stdout)
fmt.Println(`{"hits":{"hits": [`)
first := true
for {
var r response
if err := d.Decode(&r); err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Fatal(err)
}
for _, hit := range r.Hits.Hits {
if !first {
fmt.Println(",")
}
e.Encode(hit)
first = false
}
}
fmt.Println("]}}")
}
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"strings"
esv8 "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
func main() {
log.Default().SetFlags(log.Ldate | log.Ltime | log.Llongfile)
index := flag.String("index", "", "Elasticsearch Index")
pages := flag.Int("p", 0, "search result pages. 0 or 1 return the first page, >0 creates a PIT")
query := flag.String("q", "", "Elasticsearch query")
trackHits := flag.Bool("track-hits", false, "Elasticsearch Track Total Hits")
filterPath := flag.String("f", "", "set filter_path. be sure to include pit_id if paginating")
flag.Parse()
if *pages < 0 {
fmt.Fprintf(os.Stderr, "page count must be >= 0")
os.Exit(1)
}
var esConfig esv8.Config
esConfig.APIKey = os.Getenv("ELASTICSEARCH_API_KEY")
es, err := esv8.NewClient(esConfig)
if err != nil {
log.Fatal(err)
}
var body io.Reader
if *query == "-" {
body = os.Stdin
} else if *query != "" {
body = strings.NewReader(*query)
}
ctx := context.Background()
var paginationQuery map[string]interface{}
if *pages > 0 {
if body == nil {
log.Fatal("no query to paginate over")
}
if *index == "" {
*index = "*"
}
rsp, err := es.OpenPointInTime(
strings.Split(*index, ","),
"1m",
es.OpenPointInTime.WithContext(ctx))
if err != nil {
log.Fatal("while creating PIT: ", err)
}
if rsp.IsError() {
log.Fatal("while creating PIT: ", rsp.String())
}
var pit struct {
ID string `json:"id"`
}
if err := json.NewDecoder(rsp.Body).Decode(&pit); err != nil {
log.Fatal("while parsing PIT response: ", err)
}
defer func() {
if _, err := es.ClosePointInTime(
es.ClosePointInTime.WithContext(ctx),
es.ClosePointInTime.WithBody(strings.NewReader(`{"id":"`+pit.ID+`"}`)),
); err != nil {
log.Println("failed to close PIT: ", err)
}
}()
var q map[string]interface{}
if err := json.NewDecoder(body).Decode(&q); err != nil {
log.Fatal("while parsing query: ", err)
}
if _, ok := q["sort"]; !ok {
log.Fatal("missing sort in query")
}
q["pit"] = map[string]interface{}{"id": pit.ID, "keep_alive": "1m"}
paginationQuery = q
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&q); err != nil {
log.Fatal("while generating query: ", err)
}
body = &buf
}
searchOptions := []func(*esapi.SearchRequest){
es.Search.WithContext(ctx),
es.Search.WithTrackTotalHits(*trackHits),
}
if body != nil {
searchOptions = append(searchOptions, es.Search.WithBody(body))
}
if *index != "" && *pages == 0 {
searchOptions = append(searchOptions, es.Search.WithIndex(*index))
}
if *filterPath != "" {
searchOptions = append(searchOptions, es.Search.WithFilterPath(*filterPath))
}
result := search(es, searchOptions)
for p := 1; p < *pages; p++ {
var r struct {
PITID string `json:"pit_id"`
Hits struct {
Hits []struct {
Sort []interface{} `json:"sort"`
} `json:"hits"`
} `json:"hits"`
}
if err := json.NewDecoder(result).Decode(&r); err != nil {
log.Fatal("while decoding search response: ", err)
}
hitCount := len(r.Hits.Hits)
if hitCount == 0 {
break
}
paginationQuery["search_after"] = r.Hits.Hits[hitCount-1].Sort
if oldPIT := paginationQuery["pit"].(map[string]interface{})["id"]; oldPIT != r.PITID {
log.Printf("PIT changed, old: %s, new: %s", oldPIT, r.PITID)
}
paginationQuery["pit"] = map[string]interface{}{"id": r.PITID, "keep_alive": "1m"}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&paginationQuery); err != nil {
log.Fatal("while generating query: ", err)
}
searchOptions = []func(*esapi.SearchRequest){
es.Search.WithContext(ctx),
es.Search.WithTrackTotalHits(*trackHits),
es.Search.WithBody(&buf),
}
if *filterPath != "" {
searchOptions = append(searchOptions, es.Search.WithFilterPath(*filterPath))
}
result = search(es, searchOptions)
}
}
// search is for convenience
func search(es *esv8.Client, opts []func(*esapi.SearchRequest)) *bytes.Buffer {
res, err := es.Search(opts...)
if err != nil {
log.Fatal(err)
}
if res.IsError() {
log.Fatal(res.String())
}
var result bytes.Buffer
if _, err := io.Copy(os.Stdout, io.TeeReader(res.Body, &result)); err != nil {
log.Fatal(err)
}
return &result
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment