Skip to content

Instantly share code, notes, and snippets.

@kirk91
Last active July 11, 2017 09:00
Show Gist options
  • Save kirk91/7c7c0f6a90e7bf2feede41dde77e0bdd to your computer and use it in GitHub Desktop.
Save kirk91/7c7c0f6a90e7bf2feede41dde77e0bdd to your computer and use it in GitHub Desktop.
package main
import (
"context"
"flag"
"fmt"
"log"
"runtime"
"strings"
"time"
"github.com/google/uuid"
"gopkg.in/olivere/elastic.v3"
)
var (
addrs = flag.String("addrs", "localhost:9200", "elasticsearch addresses splited by comma")
concurrency = flag.Int("concurrency", runtime.NumCPU(), "concurrency number, defaults to the number of cpu cores.")
sniff = flag.Bool("sniff", true, "sniff elasticsearch nodes info")
index = flag.String("index", "arch_taco", "elasticsearch index")
typ = flag.String("type", "arch_taco", "elasticsearch index type")
)
type Record struct {
MsgID string `json:"msg_id"`
Channel string `json:"channel"`
State string `json:"state"`
CreatedAt string `json:"created_at"`
Module string `json:"module"`
Info []byte `json:"info"`
}
func main() {
flag.Parse()
var urls []string
for _, addr := range strings.Split(*addrs, ",") {
urls = append(urls, "http://"+addr)
}
if len(urls) == 0 {
panic("invalid addrs")
}
// var clients []*elastic.Client
// for i := 0; i < *concurrency; i++ {
client, err := elastic.NewClient(elastic.SetURL(urls...), elastic.SetSniff(*sniff))
if err != nil {
panic(err)
}
// clients = append(clients, client)
// }
recordC := make(chan *Record, *concurrency*10)
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
return
default:
record := &Record{
MsgID: uuid.New().String(),
Channel: "xiaomi",
State: "dispatched",
CreatedAt: time.Now().Format("2017-06-28 13:50:37"),
Module: "message",
Info: make([]byte, 512),
}
recordC <- record
}
}
}()
for i := 0; i < *concurrency; i++ {
// client := clients[i]
go func() {
bulk := client.Bulk().Index(*index).Type(*typ)
requests := make([]*elastic.BulkIndexRequest, 0, 10)
for {
select {
case <-ctx.Done():
return
case record := <-recordC:
request := elastic.NewBulkIndexRequest().Id(uuid.New().String()).Doc(record)
requests = append(requests, request)
if len(requests) < 10 {
continue
}
case <-time.After(5 * time.Second):
if len(requests) == 0 {
continue
}
}
for _, request := range requests {
bulk.Add(request)
}
_, err := bulk.Do()
if err != nil {
log.Println("send the batched requests to elasticsearch failed:", err)
}
requests = requests[:0]
}
}()
}
var s string
fmt.Scan(&s)
cancel()
time.Sleep(1 * time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment