Skip to content

Instantly share code, notes, and snippets.

@jaytaylor
Last active November 7, 2017 18:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaytaylor/997ccc3690ccd3ac5196211aff59d989 to your computer and use it in GitHub Desktop.
Save jaytaylor/997ccc3690ccd3ac5196211aff59d989 to your computer and use it in GitHub Desktop.
Kubernetes Elasticsearch indexing performance testing tool.
package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
"time"
"github.com/icrowley/fake"
"github.com/robfig/cron"
log "github.com/sirupsen/logrus"
"gopkg.in/olivere/elastic.v5"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func init() {
log.SetLevel(log.DebugLevel)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
const (
PoolChanSize = 100
DefaultBulkIndexingBatchSize = 1000
)
func main() {
var (
kubeconfig *string
podExpr *string
numIndexes *int
startIndexOffset *int
numDocuments *int
workerMultiplier *int
bulkIndexingBatchSize *int
)
{
if home := homeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
podExpr = flag.String("pod-expr", ".*", "pod name regular expression to match")
numIndexes = flag.Int("num-indexes", 0, "number of indexes to create")
startIndexOffset = flag.Int("start-index-offset", 0, "index offset to begin at (for partial runs)")
numDocuments = flag.Int("num-documents", 1, "number of documents to push to each index")
workerMultiplier = flag.Int("worker-multiplier", 1, "multiplier for how many workers to launch relative to matched pods")
bulkIndexingBatchSize = flag.Int("batch-size", DefaultBulkIndexingBatchSize, "number of docs for each bulk indexing batch")
flag.Parse()
}
// Use the current context in kubeconfig.
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// Create the clientset.
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
p := &PodUtil{
Clientset: clientset,
}
urls, err := p.PodURLs(*podExpr)
if err != nil {
panic(err)
}
log.Infof("Found %v ES URLs in k8s pod in preliminary check: %+v", len(urls), urls)
urlsProviderFn := func() ([]string, error) { return p.PodURLs(*podExpr) }
jobInfo := JobInfo{
NumIndexes: *numIndexes,
StartIndexOffset: *startIndexOffset,
NumDocuments: *numDocuments,
WorkerMultiplier: *workerMultiplier,
BulkIndexingBatchSize: *bulkIndexingBatchSize,
}
if err := Start(urlsProviderFn, jobInfo); err != nil {
panic(err)
}
}
type JobInfo struct {
NumIndexes int
StartIndexOffset int
NumDocuments int
WorkerMultiplier int
BulkIndexingBatchSize int
}
type IndexRequest struct {
Index string
DocType string
Doc interface{}
}
type BulkIndexRequest struct {
IndexRequests []*IndexRequest
}
type BulkIndexResult struct {
Count int64
Response *elastic.BulkResponse
Err error
}
func Start(podESURLsProvider func() ([]string, error), jobInfo JobInfo) error {
var (
jobsCh = make(chan *BulkIndexRequest, PoolChanSize)
resultsCh = make(chan *BulkIndexResult, PoolChanSize)
wg sync.WaitGroup
numDocs int64
numIndexes int64
numErrs int64
c = RateMonitorCron(5, &numDocs, &numIndexes, &numErrs)
)
urls, err := podESURLsProvider()
if err != nil {
return err
}
client, err := elastic.NewClient(elastic.SetURL(urls...))
if err != nil {
return err
}
indexingWorker := func(id int, wg *sync.WaitGroup, jobsCh <-chan *BulkIndexRequest, resultsCh chan<- *BulkIndexResult) {
log.Infof("indexingWorker id=%v starting", id)
for {
select {
case bir, ok := <-jobsCh:
if !ok {
log.Infof("indexingWorker id=%v shutting down", id)
return
}
resultsCh <- BulkIndex(client, bir)
wg.Done()
}
}
}
workProducer := func(jobsCh chan<- *BulkIndexRequest, wg *sync.WaitGroup, startedSigCh chan struct{}) {
log.Info("workProducer starting")
bir := newBulkIndexRequest()
for i := jobInfo.StartIndexOffset; i < jobInfo.NumIndexes; i++ {
for j := 0; j < jobInfo.NumDocuments; j++ {
bir.IndexRequests = append(bir.IndexRequests, newIndexRequest(i, genDoc(j)))
if len(bir.IndexRequests) >= jobInfo.BulkIndexingBatchSize {
wg.Add(1)
if startedSigCh != nil {
startedSigCh <- struct{}{}
startedSigCh = nil
}
jobsCh <- bir
bir = newBulkIndexRequest()
}
}
}
if len(bir.IndexRequests) > 0 {
// Backfill any leftovers.
wg.Add(1)
jobsCh <- bir
}
log.Info("workProducer finished, shutting down")
if startedSigCh != nil {
startedSigCh <- struct{}{}
startedSigCh = nil
}
}
startedSigCh := make(chan struct{})
go workProducer(jobsCh, &wg, startedSigCh)
for i := 0; i < len(urls)*jobInfo.WorkerMultiplier; i++ {
go indexingWorker(i+1, &wg, jobsCh, resultsCh)
}
go func() {
<-startedSigCh
close(startedSigCh)
wg.Wait()
close(jobsCh)
close(resultsCh)
}()
c.Start()
defer c.Stop()
seenIndexes := map[string]struct{}{}
for {
select {
case result, ok := <-resultsCh:
if !ok {
log.Infof("resultsCh closed, batch completed with %v errors", atomic.LoadInt64(&numErrs))
return nil
}
if result.Err != nil {
atomic.AddInt64(&numErrs, result.Count)
log.Errorf("Result: %+v", *result)
} else {
atomic.AddInt64(&numDocs, int64(len(result.Response.Succeeded())))
atomic.AddInt64(&numErrs, int64(len(result.Response.Failed())))
for _, r := range result.Response.Succeeded() {
if _, ok := seenIndexes[r.Index]; !ok {
seenIndexes[r.Index] = struct{}{}
}
}
atomic.StoreInt64(&numIndexes, int64(len(seenIndexes)))
}
}
}
}
func BulkIndex(client *elastic.Client, bir *BulkIndexRequest) (result *BulkIndexResult) {
result = &BulkIndexResult{
Count: int64(len(bir.IndexRequests)),
}
bulk := client.Bulk()
for _, ir := range bir.IndexRequests {
bulkReq := elastic.NewBulkIndexRequest().Index(ir.Index).Type(ir.DocType).Doc(ir.Doc)
bulk.Add(bulkReq)
}
if result.Response, result.Err = bulk.Do(context.TODO()); result.Err != nil {
return
}
return
}
func newBulkIndexRequest() *BulkIndexRequest {
bir := &BulkIndexRequest{
IndexRequests: []*IndexRequest{},
}
return bir
}
func newIndexRequest(i int, doc interface{}) *IndexRequest {
ir := &IndexRequest{
Index: fmt.Sprintf("test_index_%v", i),
DocType: fmt.Sprintf("doctype_%v", i),
Doc: doc,
}
return ir
}
func genDoc(j int) interface{} {
return map[string]interface{}{
"user": fmt.Sprintf("%v%v", os.Getenv("USER"), j),
"name": fake.FullName(),
"content": fake.Paragraph(),
"timestamp": time.Now().Unix(),
"nanos": fmt.Sprintf("%v", time.Now().UnixNano()),
}
}
func RateMonitorCron(frequencySecs int64, numDocs *int64, numIndexes *int64, numErrs *int64) *cron.Cron {
var (
c = cron.New()
last = map[string]int64{}
createdAt = time.Now().Unix()
)
c.AddFunc(fmt.Sprintf("@every %vs", frequencySecs), func() {
var (
latestDocs = atomic.LoadInt64(numDocs)
latestIndexes = atomic.LoadInt64(numIndexes)
latestErrors = atomic.LoadInt64(numErrs)
)
log.
WithField("docs-indexing-rate", fmt.Sprintf("%v/s", (latestDocs-last["results"])/frequencySecs)).
WithField("elapsed-secs", time.Now().Unix()-createdAt).
WithField("error-rate", fmt.Sprintf("%v/s", (latestErrors-last["errs"])/frequencySecs)).
WithField("total-docs", latestDocs).
WithField("total-indexes", latestIndexes).
WithField("total-errors", latestErrors).
Info("")
// log.Infof("results rate=%v/s (%v total) | error rate=%v/s (%v total)", (latestDocs-last["results"])/frequencySecs, latestDocs, (latestErrors-last["errs"])/frequencySecs, latestErrors)
last["results"] = latestDocs
last["errs"] = latestErrors
})
return c
}
type PodUtil struct {
Clientset *kubernetes.Clientset
}
func (p *PodUtil) PodURLs(expr string) ([]string, error) {
pods, err := p.FindPods(expr)
if err != nil {
return nil, err
}
ips := []string{}
for _, pod := range pods {
ips = append(ips, fmt.Sprintf("http://%v:9200", pod.Status.PodIP))
}
return ips, nil
}
func (p *PodUtil) FindPods(expr string) ([]v1.Pod, error) {
pods, err := p.Clientset.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
return nil, err
}
x, err := regexp.Compile(expr)
if err != nil {
return nil, fmt.Errorf("compiling regular expression %q: %s", err)
}
matches := []v1.Pod{}
for _, pod := range pods.Items {
if x.MatchString(pod.Name) {
matches = append(matches, pod)
}
}
if len(matches) == 0 {
return nil, fmt.Errorf("no pods matching %q were found", expr)
}
return matches, nil
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
#!/usr/bin/env bash
set -o errexit
set -o pipefail
set -o nounset
set -x
command -v jq || ( echo 'error: jq is required not available in $PATH - "sudo yum install jq", perhaps?' 1>&2 && exit 1 )
cd "$(dirname "$0")"
batchSize=${batchSize:-20000}
clientExpr="${clientExpr:-k8s-.*-client.*}"
mult=2
idxDocCounts='100 20000
500 4000
1000 2000
2000 1000
4000 500
8000 250'
mkdir -p test-logs
IFS_BAK="${IFS}"
export IFS=$'\n'
for pair in ${idxDocCounts} ; do
numIdx="$(echo "${pair}" | cut -d ' ' -f 1)"
numDocs="$(echo "${pair}" | cut -d ' ' -f 2)"
# Purge all data from the cluster.
#clientAddr="$(kubectl get pods -o wide | grep 'k8s-elasticsearch-.*-client-' | sed 's/ \+/ /g' | cut -d ' ' -f 6 | shuf -n 1)"
clientAddr="$(
kubectl get pods -o json \
| jq -r '.items[] | select(.status.podIP != "") | "\(.metadata.name) \(.status.podIP)"' \
| grep "${clientExpr}" \
| cut -d ' ' -f 2 \
| shuf -n 1 \
)"
curl -sS -XDELETE "http://${clientAddr}:9200/*" && echo ''
go run main.go \
-pod-expr "${clientExpr}" \
-num-documents=${numDocs} \
-num-indexes=${numIdx} \
-start-index-offset=0 \
-batch-size=${batchSize} \
-worker-multiplier=${mult} 2>&1 \
| tee "test-logs/num-indexes-${numIdx}_num-docs-${numDocs}.txt"
done
export IFS="${IFS_BAK}"
unset IFS_BAK
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Elasticsearch performance graphing
--------------------------------------------------------------------------------
If running this script on macOS gives no output or an error like:
**RuntimeError**: Python is not installed as a framework. The Mac OS X
backend will not be able to function correctly if Python is not installed
as a framework. See the Python documentation for more information on
installing Python as a framework on Mac OS X. Please either reinstall Python
as a framework, or try one of the other backends.
Then do this:
Create file ~/.matplotlib/matplotlibrc with the line:
backend: TkAgg
---
Also see:
https://stackoverflow.com/questions/21784641/installation-issue-with-matplotlib-python
"""
import glob
import os
import subprocess
#from itertools import cycle
from cycler import cycler
import matplotlib.lines as mlines
import matplotlib.pyplot as plt
import numpy as np
from scipy.interpolate import spline
## TODO:
## Automatically set a working backend if nowhen run
#open(os.getenv('HOME') + '/.matplotlib/matplotlibrc
## Data for plotting
#t = np.arange(0.0, 2.0, 0.3)
#s = 1 + np.sin(2 * np.pi * t)
#
## Note that using plt.subplots below is equivalent to using
## fig = plt.figure and then ax = fig.add_subplot(111)
#fig, ax = plt.subplots()
#ax.plot(t, s)
#
#ax.set(xlabel='time (s)', ylabel='voltage (mV)',
# title='About as simple as it gets, folks')
#ax.grid()
#
#fig.savefig("test.png")
#plt.show()
def get_col_dim(filename, col_num):
#print '%s /%s/' % (filename, subprocess.check_output(
# ['bash', '-c', '''cat %s | grep elapsed-secs | cut -d ' ' -f 3,4 | sed -e 's/[^0-9]\{1,\}/ /g' -e 's/^ *//' | awk '{ print $%s }' ''' % (filename, col_num)],
#))
dim = map(int, subprocess.check_output(
['bash', '-c', '''cat %s | grep elapsed-secs | cut -d ' ' -f 3,4,6 | sed -e 's/[^0-9]\{1,\}/ /g' -e 's/^ *//' | awk '{ print $%s }' ''' % (filename, col_num)],
).strip('\n').strip('\r').strip(' ').split('\n'))
return dim
frequency = 5
max_x = 0
max_y = 0
#plots = []
#handles = []
#lines = ['-', '--', '-.', ':']
#linecycler = cycle(lines)
#plt.figure()
for f in glob.glob('test-logs/*'):
x = get_col_dim(f, 2)
y = get_col_dim(f, 3)
max_x = max(*[max_x]+x)
max_y = max(*[max_y]+y)
x = np.array(x)
y = np.array(y)
xnew = np.linspace(x.min(), x.max(), 300) # 300 represents number of points to make between T.min and T.max
y_smooth = spline(x, y, xnew)
plt.plot(xnew, y_smooth, label=f.split('/', 1)[1], markevery=frequency)
#plt.plot(x, y, next(linecycler), label=f.split('/', 1)[1])
#line = mlines.Line2D(x, y, marker='*', markersize=15, label=f.split('/', 1)[1])
#handles.append(line)
#x = get_col_dim('test-logs/numIdx-100_numDocs-200.txt', 2)
#y = get_col_dim('test-logs/numIdx-100_numDocs-200.txt', 1)
#plt.plot(x, y, 'ro') #[1,2,3,4], [1,4,9,16], 'ro')
#plt.plot(x, y, 'r--', x, y, 'bs', x, y, 'g^')
#plt.axis([0, int(1.1 * max_x), 0, int(1.1 * max_y)])
#plt.plot(*plots)
plt.rc('lines', linewidth=4)
#plt.rc('axes', prop_cycle=(cycler('color', ['r', 'g', 'b', 'y']) + cycler('linestyle', ['-', '--', ':', '-.'])))
plt.rc('axes', prop_cycle=cycler('color', 'rbgykcm') * cycler('linestyle', ['-', '--', '-.', ':']))
plt.legend() #handles=handles)
plt.show()
import sys
sys.exit()
#plt.plot([1, 2, 3, 4], [1, 4, 9, 16])
##fig.savefig("test.png")
#plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment