Skip to content

Instantly share code, notes, and snippets.

@philipjkim
Last active June 16, 2021 18:02
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save philipjkim/8f5166b2e36a828d0d451ecae3042a15 to your computer and use it in GitHub Desktop.
Save philipjkim/8f5166b2e36a828d0d451ecae3042a15 to your computer and use it in GitHub Desktop.
Go Elasticsearch aggregation example
package main
import (
"fmt"
"time"
"encoding/json"
elastic "gopkg.in/olivere/elastic.v3"
)
/* sample ES data (`log` is the field we're analyzing)
[
{
"_index":"idx-2016.11.11",
"_type":"type",
"_id":"AVhRuy9FsA_K7PKVoIPw",
"_score":1.3449254,
"_source":{
"log":"[logOrigin] my_home ",
"stream":"stdout",
"docker":{
"container_id":"01a5362faf335e37bf54283ec2ded0556536439d75fe806eb8f89eefc06e077e"
},
"kubernetes":{
"namespace_name":"myns",
"pod_id":"8390f2ef-a7ca-11e6-830e-fa163ed354c9",
"labels":{
"branch":"master",
"service":"mysvc"
},
},
"@timestamp":"2016-11-11T13:52:40+09:00"
}
},
{
"_index":"idx-2016.11.11",
"_type":"type",
"_id":"AVhRuy9FsA_K7PKVoIPy",
"_score":1.3449254,
"_source":{
"log":"[logOrigin] my_office ",
"stream":"stdout",
"docker":{
"container_id":"01a5362faf335e37bf54283ec2ded0556536439d75fe806eb8f89eefc06e077e"
},
"kubernetes":{
"namespace_name":"myns",
"pod_id":"8390f2ef-a7ca-11e6-830e-fa163ed354c9",
"labels":{
"branch":"master",
"service":"mysvc"
},
},
"@timestamp":"2016-11-11T13:52:40+09:00"
}
}
]
*/
const (
url = "http://my_es_hostname:9200"
aggregationName = "origins"
)
var client *elastic.Client
func init() {
c, err := elastic.NewClient(elastic.SetURL(url), elastic.SetMaxRetries(10))
if err != nil {
fmt.Printf("elasticsearch init failed: %v\n", err)
panic(err)
}
client = c
}
func main() {
now := time.Now()
formatForES := "2006-01-02T15:04:05-07:00"
nowStr := now.Format(formatForES)
ltStr := nowStr[:15] + "0:00+09:00"
lt, _ := time.Parse(formatForES, ltStr)
gt := lt.Add(time.Duration(-1) * time.Hour)
gtStr := gt.Format(formatForES)
fmt.Printf("## %v ~ %v\n", gtStr, ltStr)
boolQuery := elastic.NewBoolQuery()
boolQuery.Must(
elastic.NewTermQuery("kubernetes.labels.service", "mysvc"),
elastic.NewTermQuery("kubernetes.labels.branch", "master"),
elastic.NewQueryStringQuery("logOrigin"),
elastic.NewRangeQuery("@timestamp").
Gt(gtStr).
Lt(ltStr),
)
index := "idx-" + now.Format("2006.01.02")
searchResult, err := getResult(index, boolQuery)
if err != nil {
fmt.Printf("search failed: %v\n", err)
return
}
fmt.Printf("total hits: %v\n\n", searchResult.Hits.TotalHits)
printResult(searchResult)
}
func getResult(index string, query elastic.Query) (*elastic.SearchResult, error) {
aggr := elastic.NewTermsAggregation().Field("log").Size(20)
return client.Search().
Index(index).
Query(query).
Size(0).
Aggregation(aggregationName, aggr).
Pretty(true).
Do()
}
func printResult(result *elastic.SearchResult) {
rawMsg := result.Aggregations[aggregationName]
var ar elastic.AggregationBucketKeyItems
err := json.Unmarshal(*rawMsg, &ar)
if err != nil {
fmt.Printf("Unmarshal failed: %v\n", err)
return
}
for _, item := range ar.Buckets {
fmt.Printf("%v: %v\n", item.Key, item.DocCount)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment