Skip to content

Instantly share code, notes, and snippets.

@olivere
Created September 17, 2018 14:59
Show Gist options
  • Save olivere/050fc4313ad112fe536360e1e93ada1e to your computer and use it in GitHub Desktop.
Save olivere/050fc4313ad112fe536360e1e93ada1e to your computer and use it in GitHub Desktop.
issue-907.go
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.
// Example code for #907.
//
// ./issue-907 -url=http://127.0.0.1:9200 -sniff=false -d
//
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"github.com/olivere/elastic"
)
const (
mapping = `
{
"settings":{
"number_of_shards":1,
"number_of_replicas":0
},
"mappings":{
"_doc":{
"properties":{
"container_name": {
"type":"keyword"
},
"pod_name": {
"type":"keyword"
},
"@timestamp": {
"type":"date"
}
}
}
}
}`
)
type Doc struct {
ContainerName string `json:"container_name"`
PodName string `json:"pod_name"`
Timestamp string `json:"@timestamp"` // time.Time
}
func main() {
var (
url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
index = flag.String("index", "", "Index name")
debug = flag.Bool("d", false, "Enable debug output")
)
flag.Parse()
log.SetFlags(0)
if *url == "" {
*url = "http://127.0.0.1:9200"
}
if *index == "" {
log.Fatal("please specify an index name -index")
}
// Create an Elasticsearch client
options := []elastic.ClientOptionFunc{
elastic.SetURL(*url),
elastic.SetSniff(*sniff),
}
if *debug {
logger := log.New(os.Stdout, "", 0)
options = append(options, elastic.SetTraceLog(logger))
}
client, err := elastic.NewClient(options...)
if err != nil {
log.Fatal(err)
}
// Just a status message
fmt.Println("Connected")
// Check if index already exists. We'll drop it then.
// Next, we create a fresh index/mapping.
exists, err := client.IndexExists(*index).Do(context.Background())
if err != nil {
log.Fatal(err)
}
if exists {
_, err := client.DeleteIndex(*index).Do(context.Background())
if err != nil {
log.Fatal(err)
}
}
_, err = client.CreateIndex(*index).Body(mapping).Do(context.Background())
if err != nil {
log.Fatal(err)
}
// Add some docs via Bulk API
{
docs := []Doc{
{
ContainerName: "search",
PodName: "search-7ah77z",
Timestamp: "2016-01-01", // time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
ContainerName: "maps",
PodName: "maps-h19327k",
Timestamp: "2015-01-01", // time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
ContainerName: "search",
PodName: "search-8jkllm",
Timestamp: "2017-01-01", // time.Date(2017, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
ContainerName: "mail",
PodName: "mail-o910ak0",
Timestamp: "2018-01-01", // time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
ContainerName: "search",
PodName: "search-099910",
Timestamp: "2018-01-01", // time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
ContainerName: "search",
PodName: "search-99kal10",
Timestamp: "2018-01-01", // time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
ContainerName: "search",
PodName: "search-azz910",
Timestamp: "2018-01-01", // time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
},
}
var reqs []elastic.BulkableRequest
for i, doc := range docs {
req := elastic.NewBulkIndexRequest().
Id(fmt.Sprint(i + 1)).
Doc(doc)
reqs = append(reqs, req)
}
// Write the bulk requests and make sure everything's written
_, err := client.Bulk().
Index(*index).
Type("_doc").
Add(reqs...).
Refresh("wait_for").
Do(context.Background())
if err != nil {
log.Fatal(err)
}
}
// Now run a query
termQ := elastic.NewTermQuery("container_name", "search")
rangeQ := elastic.NewRangeQuery("@timestamp").Gte("2017-01-01").Lte("now")
boolQ := elastic.NewBoolQuery().Must(termQ).Filter(rangeQ)
podNameAgg := elastic.NewTermsAggregation().
Field("pod_name").
OrderByCountDesc()
timestampAgg := elastic.NewTermsAggregation().
Field("@timestamp").
OrderByTermDesc().
SubAggregation("pods", podNameAgg)
res, err := client.Search(*index).
Type("_doc").
Query(boolQ).
Aggregation("timestamps", timestampAgg).
Sort("pod_name", false).
Pretty(true).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
if n := res.TotalHits(); n > 0 {
log.Printf("Found a total of %d records\n", n)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment