Skip to content

Instantly share code, notes, and snippets.

@joshrendek
Last active October 17, 2015 15:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joshrendek/5c2410eb63db0b5f162d to your computer and use it in GitHub Desktop.
Save joshrendek/5c2410eb63db0b5f162d to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"github.com/satori/go.uuid"
"os"
//"crypto/tls"
"bytes"
"encoding/json"
"github.com/Sirupsen/logrus"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
)
var (
log = logrus.New()
esHttpClient = GetHttpClient()
wg sync.WaitGroup
esQueue chan (*Record) = make(chan *Record, 1)
total_es_records = 0
mutex sync.Mutex
done chan (bool) = make(chan bool, 1)
)
var DefaultDialer = &net.Dialer{Timeout: 10 * time.Second, KeepAlive: 10 * time.Second}
func GetHttpClient() http.Client {
tr := &http.Transport{
//TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Dial: DefaultDialer.Dial,
// TLSHandshakeTimeout: 2 * time.Second,
MaxIdleConnsPerHost: 1100,
}
if os.Getenv("HTTPS_PROXY") != "" {
proxyUrl, _ := url.Parse(os.Getenv("HTTPS_PROXY"))
tr.Proxy = http.ProxyURL(proxyUrl)
}
client := http.Client{Transport: tr}
return client
}
type Record struct {
id string
Name string
Time string
}
func (s *Record) SendToES() {
url := fmt.Sprintf("http://elasticsearch:9200/foobar/example_type/%s", s.id)
retry_counter := 0
for {
js, err := json.Marshal(s)
if err != nil {
log.Error("Error marshalling record: ", err)
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(js))
if err != nil {
log.Error(err)
}
req.Header.Set("Content-Type", "application/json")
//req.Header.Set("Connection", "close") // 997,120 docs
req.Header.Set("Connection", "keep-alive") // 700k
resp, err := esHttpClient.Do(req)
if err != nil {
time.Sleep(time.Duration(retry_counter) * time.Second)
retry_counter++
if retry_counter > 100 {
log.Error("ES errored 5x: ", err)
break
}
continue
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Error(err)
}
if os.Getenv("DEBUG") == "true" {
log.Info(string(body))
}
break
}
}
func startEsWorkers() {
for i := 0; i < 1200; i++ {
wg.Add(1)
go func() {
for r := range esQueue {
r.SendToES()
mutex.Lock()
total_es_records++
mutex.Unlock()
}
wg.Done()
}()
}
done <- true
}
func main() {
go startEsWorkers()
for i := 0; i < 1000000; i++ {
esQueue <- &Record{id: uuid.NewV4().String(), Name: "John Doe", Time: time.Now().String()}
}
close(esQueue)
<-done
for {
log.Info("Sleeping")
log.Info("Finished queueing: ", total_es_records)
time.Sleep(1 * time.Second)
}
}
#!/bin/bash
curl -XDELETE elasticsearch:9200/foobar
curl -XPUT elasticsearch:9200/foboar -d '{
"settings" : {
"number_of_shards" : 10,
"number_of_replicas" : 0
}
}'
./main
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment