Last active
November 7, 2017 18:37
-
-
Save jaytaylor/997ccc3690ccd3ac5196211aff59d989 to your computer and use it in GitHub Desktop.
Kubernetes Elasticsearch indexing performance testing tool.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"fmt" | |
"os" | |
"path/filepath" | |
"regexp" | |
"sync" | |
"sync/atomic" | |
"time" | |
"github.com/icrowley/fake" | |
"github.com/robfig/cron" | |
log "github.com/sirupsen/logrus" | |
"gopkg.in/olivere/elastic.v5" | |
"k8s.io/api/core/v1" | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
"k8s.io/client-go/kubernetes" | |
"k8s.io/client-go/tools/clientcmd" | |
) | |
func init() { | |
log.SetLevel(log.DebugLevel) | |
log.SetFormatter(&log.TextFormatter{ | |
FullTimestamp: true, | |
}) | |
} | |
const ( | |
PoolChanSize = 100 | |
DefaultBulkIndexingBatchSize = 1000 | |
) | |
func main() { | |
var ( | |
kubeconfig *string | |
podExpr *string | |
numIndexes *int | |
startIndexOffset *int | |
numDocuments *int | |
workerMultiplier *int | |
bulkIndexingBatchSize *int | |
) | |
{ | |
if home := homeDir(); home != "" { | |
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") | |
} else { | |
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") | |
} | |
podExpr = flag.String("pod-expr", ".*", "pod name regular expression to match") | |
numIndexes = flag.Int("num-indexes", 0, "number of indexes to create") | |
startIndexOffset = flag.Int("start-index-offset", 0, "index offset to begin at (for partial runs)") | |
numDocuments = flag.Int("num-documents", 1, "number of documents to push to each index") | |
workerMultiplier = flag.Int("worker-multiplier", 1, "multiplier for how many workers to launch relative to matched pods") | |
bulkIndexingBatchSize = flag.Int("batch-size", DefaultBulkIndexingBatchSize, "number of docs for each bulk indexing batch") | |
flag.Parse() | |
} | |
// Use the current context in kubeconfig. | |
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) | |
if err != nil { | |
panic(err) | |
} | |
// Create the clientset. | |
clientset, err := kubernetes.NewForConfig(config) | |
if err != nil { | |
panic(err) | |
} | |
p := &PodUtil{ | |
Clientset: clientset, | |
} | |
urls, err := p.PodURLs(*podExpr) | |
if err != nil { | |
panic(err) | |
} | |
log.Infof("Found %v ES URLs in k8s pod in preliminary check: %+v", len(urls), urls) | |
urlsProviderFn := func() ([]string, error) { return p.PodURLs(*podExpr) } | |
jobInfo := JobInfo{ | |
NumIndexes: *numIndexes, | |
StartIndexOffset: *startIndexOffset, | |
NumDocuments: *numDocuments, | |
WorkerMultiplier: *workerMultiplier, | |
BulkIndexingBatchSize: *bulkIndexingBatchSize, | |
} | |
if err := Start(urlsProviderFn, jobInfo); err != nil { | |
panic(err) | |
} | |
} | |
type JobInfo struct { | |
NumIndexes int | |
StartIndexOffset int | |
NumDocuments int | |
WorkerMultiplier int | |
BulkIndexingBatchSize int | |
} | |
type IndexRequest struct { | |
Index string | |
DocType string | |
Doc interface{} | |
} | |
type BulkIndexRequest struct { | |
IndexRequests []*IndexRequest | |
} | |
type BulkIndexResult struct { | |
Count int64 | |
Response *elastic.BulkResponse | |
Err error | |
} | |
func Start(podESURLsProvider func() ([]string, error), jobInfo JobInfo) error { | |
var ( | |
jobsCh = make(chan *BulkIndexRequest, PoolChanSize) | |
resultsCh = make(chan *BulkIndexResult, PoolChanSize) | |
wg sync.WaitGroup | |
numDocs int64 | |
numIndexes int64 | |
numErrs int64 | |
c = RateMonitorCron(5, &numDocs, &numIndexes, &numErrs) | |
) | |
urls, err := podESURLsProvider() | |
if err != nil { | |
return err | |
} | |
client, err := elastic.NewClient(elastic.SetURL(urls...)) | |
if err != nil { | |
return err | |
} | |
indexingWorker := func(id int, wg *sync.WaitGroup, jobsCh <-chan *BulkIndexRequest, resultsCh chan<- *BulkIndexResult) { | |
log.Infof("indexingWorker id=%v starting", id) | |
for { | |
select { | |
case bir, ok := <-jobsCh: | |
if !ok { | |
log.Infof("indexingWorker id=%v shutting down", id) | |
return | |
} | |
resultsCh <- BulkIndex(client, bir) | |
wg.Done() | |
} | |
} | |
} | |
workProducer := func(jobsCh chan<- *BulkIndexRequest, wg *sync.WaitGroup, startedSigCh chan struct{}) { | |
log.Info("workProducer starting") | |
bir := newBulkIndexRequest() | |
for i := jobInfo.StartIndexOffset; i < jobInfo.NumIndexes; i++ { | |
for j := 0; j < jobInfo.NumDocuments; j++ { | |
bir.IndexRequests = append(bir.IndexRequests, newIndexRequest(i, genDoc(j))) | |
if len(bir.IndexRequests) >= jobInfo.BulkIndexingBatchSize { | |
wg.Add(1) | |
if startedSigCh != nil { | |
startedSigCh <- struct{}{} | |
startedSigCh = nil | |
} | |
jobsCh <- bir | |
bir = newBulkIndexRequest() | |
} | |
} | |
} | |
if len(bir.IndexRequests) > 0 { | |
// Backfill any leftovers. | |
wg.Add(1) | |
jobsCh <- bir | |
} | |
log.Info("workProducer finished, shutting down") | |
if startedSigCh != nil { | |
startedSigCh <- struct{}{} | |
startedSigCh = nil | |
} | |
} | |
startedSigCh := make(chan struct{}) | |
go workProducer(jobsCh, &wg, startedSigCh) | |
for i := 0; i < len(urls)*jobInfo.WorkerMultiplier; i++ { | |
go indexingWorker(i+1, &wg, jobsCh, resultsCh) | |
} | |
go func() { | |
<-startedSigCh | |
close(startedSigCh) | |
wg.Wait() | |
close(jobsCh) | |
close(resultsCh) | |
}() | |
c.Start() | |
defer c.Stop() | |
seenIndexes := map[string]struct{}{} | |
for { | |
select { | |
case result, ok := <-resultsCh: | |
if !ok { | |
log.Infof("resultsCh closed, batch completed with %v errors", atomic.LoadInt64(&numErrs)) | |
return nil | |
} | |
if result.Err != nil { | |
atomic.AddInt64(&numErrs, result.Count) | |
log.Errorf("Result: %+v", *result) | |
} else { | |
atomic.AddInt64(&numDocs, int64(len(result.Response.Succeeded()))) | |
atomic.AddInt64(&numErrs, int64(len(result.Response.Failed()))) | |
for _, r := range result.Response.Succeeded() { | |
if _, ok := seenIndexes[r.Index]; !ok { | |
seenIndexes[r.Index] = struct{}{} | |
} | |
} | |
atomic.StoreInt64(&numIndexes, int64(len(seenIndexes))) | |
} | |
} | |
} | |
} | |
func BulkIndex(client *elastic.Client, bir *BulkIndexRequest) (result *BulkIndexResult) { | |
result = &BulkIndexResult{ | |
Count: int64(len(bir.IndexRequests)), | |
} | |
bulk := client.Bulk() | |
for _, ir := range bir.IndexRequests { | |
bulkReq := elastic.NewBulkIndexRequest().Index(ir.Index).Type(ir.DocType).Doc(ir.Doc) | |
bulk.Add(bulkReq) | |
} | |
if result.Response, result.Err = bulk.Do(context.TODO()); result.Err != nil { | |
return | |
} | |
return | |
} | |
func newBulkIndexRequest() *BulkIndexRequest { | |
bir := &BulkIndexRequest{ | |
IndexRequests: []*IndexRequest{}, | |
} | |
return bir | |
} | |
func newIndexRequest(i int, doc interface{}) *IndexRequest { | |
ir := &IndexRequest{ | |
Index: fmt.Sprintf("test_index_%v", i), | |
DocType: fmt.Sprintf("doctype_%v", i), | |
Doc: doc, | |
} | |
return ir | |
} | |
func genDoc(j int) interface{} { | |
return map[string]interface{}{ | |
"user": fmt.Sprintf("%v%v", os.Getenv("USER"), j), | |
"name": fake.FullName(), | |
"content": fake.Paragraph(), | |
"timestamp": time.Now().Unix(), | |
"nanos": fmt.Sprintf("%v", time.Now().UnixNano()), | |
} | |
} | |
func RateMonitorCron(frequencySecs int64, numDocs *int64, numIndexes *int64, numErrs *int64) *cron.Cron { | |
var ( | |
c = cron.New() | |
last = map[string]int64{} | |
createdAt = time.Now().Unix() | |
) | |
c.AddFunc(fmt.Sprintf("@every %vs", frequencySecs), func() { | |
var ( | |
latestDocs = atomic.LoadInt64(numDocs) | |
latestIndexes = atomic.LoadInt64(numIndexes) | |
latestErrors = atomic.LoadInt64(numErrs) | |
) | |
log. | |
WithField("docs-indexing-rate", fmt.Sprintf("%v/s", (latestDocs-last["results"])/frequencySecs)). | |
WithField("elapsed-secs", time.Now().Unix()-createdAt). | |
WithField("error-rate", fmt.Sprintf("%v/s", (latestErrors-last["errs"])/frequencySecs)). | |
WithField("total-docs", latestDocs). | |
WithField("total-indexes", latestIndexes). | |
WithField("total-errors", latestErrors). | |
Info("") | |
// log.Infof("results rate=%v/s (%v total) | error rate=%v/s (%v total)", (latestDocs-last["results"])/frequencySecs, latestDocs, (latestErrors-last["errs"])/frequencySecs, latestErrors) | |
last["results"] = latestDocs | |
last["errs"] = latestErrors | |
}) | |
return c | |
} | |
type PodUtil struct { | |
Clientset *kubernetes.Clientset | |
} | |
func (p *PodUtil) PodURLs(expr string) ([]string, error) { | |
pods, err := p.FindPods(expr) | |
if err != nil { | |
return nil, err | |
} | |
ips := []string{} | |
for _, pod := range pods { | |
ips = append(ips, fmt.Sprintf("http://%v:9200", pod.Status.PodIP)) | |
} | |
return ips, nil | |
} | |
func (p *PodUtil) FindPods(expr string) ([]v1.Pod, error) { | |
pods, err := p.Clientset.CoreV1().Pods("").List(metav1.ListOptions{}) | |
if err != nil { | |
return nil, err | |
} | |
x, err := regexp.Compile(expr) | |
if err != nil { | |
return nil, fmt.Errorf("compiling regular expression %q: %s", err) | |
} | |
matches := []v1.Pod{} | |
for _, pod := range pods.Items { | |
if x.MatchString(pod.Name) { | |
matches = append(matches, pod) | |
} | |
} | |
if len(matches) == 0 { | |
return nil, fmt.Errorf("no pods matching %q were found", expr) | |
} | |
return matches, nil | |
} | |
func homeDir() string { | |
if h := os.Getenv("HOME"); h != "" { | |
return h | |
} | |
return os.Getenv("USERPROFILE") // windows | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
set -o errexit | |
set -o pipefail | |
set -o nounset | |
set -x | |
command -v jq || ( echo 'error: jq is required not available in $PATH - "sudo yum install jq", perhaps?' 1>&2 && exit 1 ) | |
cd "$(dirname "$0")" | |
batchSize=${batchSize:-20000} | |
clientExpr="${clientExpr:-k8s-.*-client.*}" | |
mult=2 | |
idxDocCounts='100 20000 | |
500 4000 | |
1000 2000 | |
2000 1000 | |
4000 500 | |
8000 250' | |
mkdir -p test-logs | |
IFS_BAK="${IFS}" | |
export IFS=$'\n' | |
for pair in ${idxDocCounts} ; do | |
numIdx="$(echo "${pair}" | cut -d ' ' -f 1)" | |
numDocs="$(echo "${pair}" | cut -d ' ' -f 2)" | |
# Purge all data from the cluster. | |
#clientAddr="$(kubectl get pods -o wide | grep 'k8s-elasticsearch-.*-client-' | sed 's/ \+/ /g' | cut -d ' ' -f 6 | shuf -n 1)" | |
clientAddr="$( | |
kubectl get pods -o json \ | |
| jq -r '.items[] | select(.status.podIP != "") | "\(.metadata.name) \(.status.podIP)"' \ | |
| grep "${clientExpr}" \ | |
| cut -d ' ' -f 2 \ | |
| shuf -n 1 \ | |
)" | |
curl -sS -XDELETE "http://${clientAddr}:9200/*" && echo '' | |
go run main.go \ | |
-pod-expr "${clientExpr}" \ | |
-num-documents=${numDocs} \ | |
-num-indexes=${numIdx} \ | |
-start-index-offset=0 \ | |
-batch-size=${batchSize} \ | |
-worker-multiplier=${mult} 2>&1 \ | |
| tee "test-logs/num-indexes-${numIdx}_num-docs-${numDocs}.txt" | |
done | |
export IFS="${IFS_BAK}" | |
unset IFS_BAK |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Elasticsearch performance graphing | |
-------------------------------------------------------------------------------- | |
If running this script on macOS gives no output or an error like: | |
**RuntimeError**: Python is not installed as a framework. The Mac OS X | |
backend will not be able to function correctly if Python is not installed | |
as a framework. See the Python documentation for more information on | |
installing Python as a framework on Mac OS X. Please either reinstall Python | |
as a framework, or try one of the other backends. | |
Then do this: | |
Create file ~/.matplotlib/matplotlibrc with the line: | |
backend: TkAgg | |
--- | |
Also see: | |
https://stackoverflow.com/questions/21784641/installation-issue-with-matplotlib-python | |
""" | |
import glob | |
import os | |
import subprocess | |
#from itertools import cycle | |
from cycler import cycler | |
import matplotlib.lines as mlines | |
import matplotlib.pyplot as plt | |
import numpy as np | |
from scipy.interpolate import spline | |
## TODO: | |
## Automatically set a working backend if nowhen run | |
#open(os.getenv('HOME') + '/.matplotlib/matplotlibrc | |
## Data for plotting | |
#t = np.arange(0.0, 2.0, 0.3) | |
#s = 1 + np.sin(2 * np.pi * t) | |
# | |
## Note that using plt.subplots below is equivalent to using | |
## fig = plt.figure and then ax = fig.add_subplot(111) | |
#fig, ax = plt.subplots() | |
#ax.plot(t, s) | |
# | |
#ax.set(xlabel='time (s)', ylabel='voltage (mV)', | |
# title='About as simple as it gets, folks') | |
#ax.grid() | |
# | |
#fig.savefig("test.png") | |
#plt.show() | |
def get_col_dim(filename, col_num): | |
#print '%s /%s/' % (filename, subprocess.check_output( | |
# ['bash', '-c', '''cat %s | grep elapsed-secs | cut -d ' ' -f 3,4 | sed -e 's/[^0-9]\{1,\}/ /g' -e 's/^ *//' | awk '{ print $%s }' ''' % (filename, col_num)], | |
#)) | |
dim = map(int, subprocess.check_output( | |
['bash', '-c', '''cat %s | grep elapsed-secs | cut -d ' ' -f 3,4,6 | sed -e 's/[^0-9]\{1,\}/ /g' -e 's/^ *//' | awk '{ print $%s }' ''' % (filename, col_num)], | |
).strip('\n').strip('\r').strip(' ').split('\n')) | |
return dim | |
frequency = 5 | |
max_x = 0 | |
max_y = 0 | |
#plots = [] | |
#handles = [] | |
#lines = ['-', '--', '-.', ':'] | |
#linecycler = cycle(lines) | |
#plt.figure() | |
for f in glob.glob('test-logs/*'): | |
x = get_col_dim(f, 2) | |
y = get_col_dim(f, 3) | |
max_x = max(*[max_x]+x) | |
max_y = max(*[max_y]+y) | |
x = np.array(x) | |
y = np.array(y) | |
xnew = np.linspace(x.min(), x.max(), 300) # 300 represents number of points to make between T.min and T.max | |
y_smooth = spline(x, y, xnew) | |
plt.plot(xnew, y_smooth, label=f.split('/', 1)[1], markevery=frequency) | |
#plt.plot(x, y, next(linecycler), label=f.split('/', 1)[1]) | |
#line = mlines.Line2D(x, y, marker='*', markersize=15, label=f.split('/', 1)[1]) | |
#handles.append(line) | |
#x = get_col_dim('test-logs/numIdx-100_numDocs-200.txt', 2) | |
#y = get_col_dim('test-logs/numIdx-100_numDocs-200.txt', 1) | |
#plt.plot(x, y, 'ro') #[1,2,3,4], [1,4,9,16], 'ro') | |
#plt.plot(x, y, 'r--', x, y, 'bs', x, y, 'g^') | |
#plt.axis([0, int(1.1 * max_x), 0, int(1.1 * max_y)]) | |
#plt.plot(*plots) | |
plt.rc('lines', linewidth=4) | |
#plt.rc('axes', prop_cycle=(cycler('color', ['r', 'g', 'b', 'y']) + cycler('linestyle', ['-', '--', ':', '-.']))) | |
plt.rc('axes', prop_cycle=cycler('color', 'rbgykcm') * cycler('linestyle', ['-', '--', '-.', ':'])) | |
plt.legend() #handles=handles) | |
plt.show() | |
import sys | |
sys.exit() | |
#plt.plot([1, 2, 3, 4], [1, 4, 9, 16]) | |
##fig.savefig("test.png") | |
#plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment