Skip to content

Instantly share code, notes, and snippets.

@pauldix
Created June 20, 2016 17:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pauldix/616fef6052f499b466c312b3100747b9 to your computer and use it in GitHub Desktop.
Save pauldix/616fef6052f499b466c312b3100747b9 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"flag"
"fmt"
"github.com/influxdata/influxdb/client"
"math/rand"
"net/url"
"os"
"strings"
"sync"
"syscall"
"time"
)
var (
userVar string
hostsVar string
tags = map[string]string{"foo": "bar"}
)
const (
valuesPerPost = 100
outputAfterPosts = 10
queriesPerPeriod = 50
pauseBetweenPeriod = time.Second
outputAfterPeriods = 5
measurement = "thing"
fieldName = "val"
)
func main() {
fmt.Println("starting")
flag.StringVar(&userVar, "user", "", "InfluxDB user to make requests to database. You will be prompted to enter the password.")
flag.StringVar(&hostsVar, "hosts", "localhost:8086", "Comma separated list of <host>:<port> of hosts to write and query during test")
flag.Parse()
var password string
if userVar != "" {
password = getPassword("Enter password: ")
}
hosts := strings.Split(hostsVar, ",")
clients := make([]*client.Client, len(hosts))
for i, h := range hosts {
config := client.NewConfig()
if userVar != "" {
config.Username = userVar
config.Password = password
}
u, err := url.Parse(fmt.Sprintf("http://%s", h))
if err != nil {
fmt.Println(err.Error())
flag.Usage()
os.Exit(1)
}
config.URL = *u
c, err := client.NewClient(config)
if err != nil {
fmt.Println(err)
flag.Usage()
os.Exit(1)
}
clients[i] = c
}
if err := writeHistorical(clients[0]); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
for _, c := range clients {
go write(c)
go query(c)
}
// wait forever...
var wg sync.WaitGroup
wg.Add(1)
wg.Wait()
}
// writeHistorical will create the database and retention policy for the demo
// and then write in enough historical data to create multiple shards.
func writeHistorical(c *client.Client) error {
q := client.Query{
Command: "CREATE DATABASE demo WITH DURATION 3d REPLICATION 1 SHARD DURATION 1h NAME \"default\"",
}
if res, err := c.Query(q); err != nil {
return err
} else if res.Error() != nil {
return res.Error()
}
fmt.Println("demo database created")
bp := client.BatchPoints{Database: "demo"}
// write 5 hours of historical data to create multiple shards for the demo
t := time.Now().Add(-5 * time.Hour)
for t.Before(time.Now()) {
bp.Points = append(bp.Points, client.Point{
Measurement: measurement,
Tags: tags,
Fields: map[string]interface{}{fieldName: 1},
Time: t,
})
t = t.Add(100 * time.Millisecond)
}
if _, err := c.Write(bp); err != nil {
return err
}
fmt.Printf("wrote %d historical points\n", len(bp.Points))
return nil
}
// continuously writes data at a set rate
func write(c *client.Client) {
runCount := 0
errorCount := 0
lastOutput := time.Now()
for {
bp := client.BatchPoints{
Database: "demo",
}
for i := 0; i < valuesPerPost; i++ {
bp.Points = append(bp.Points, client.Point{
Measurement: measurement,
Tags: tags,
Fields: map[string]interface{}{fieldName: i},
Time: time.Now(),
})
}
if _, err := c.Write(bp); err != nil {
errorCount++
}
runCount++
if runCount == outputAfterPosts {
valuesPosted := (valuesPerPost * runCount) - (valuesPerPost - errorCount)
fmt.Printf("%s: posted %d values in %d requests in %s. %d errors\n",
c.Addr(),
valuesPosted,
runCount,
time.Since(lastOutput),
errorCount)
lastOutput = time.Now()
errorCount = 0
runCount = 0
}
time.Sleep(time.Duration(100+rand.Intn(900)) * time.Millisecond)
}
}
// continuously queries the database at a set rate
func query(c *client.Client) {
runCount := 0
errorCount := 0
queryCount := 0
lastOutput := time.Now()
for {
var r *client.Response
var err error
queriesToRun := rand.Intn(queriesPerPeriod) + 1
queryCount += queriesToRun
for i := 0; i < queriesToRun; i++ {
q := client.Query{
Command: fmt.Sprintf("select count(%s) from \"demo\".\"default\".\"%s\" where time > now() - 20s", fieldName, measurement),
Database: "demo",
}
r, err = c.Query(q)
if err != nil || r.Error() != nil {
errorCount++
err = nil
}
}
runCount++
if runCount == outputAfterPeriods {
var lastResponse string
if r == nil {
lastResponse = "NaN"
} else if r.Error() != nil {
lastResponse = r.Error().Error()
} else if len(r.Results) > 0 {
s := r.Results[0].Series
if len(s) == 0 {
lastResponse = "no results"
}
v := s[0].Values
if len(v) == 0 {
lastResponse = "no results"
}
lastResponse = fmt.Sprintf("%v", v[0][1])
} else {
lastResponse = "no results"
}
fmt.Printf("%s: queried %d times with %d errors in %s. last count: %s\n",
c.Addr(),
queryCount,
errorCount,
time.Since(lastOutput),
lastResponse)
runCount = 0
queryCount = 0
errorCount = 0
lastOutput = time.Now()
}
time.Sleep(pauseBetweenPeriod)
}
}
// getPassword - Prompt for password. Use stty to disable echoing.
func getPassword(prompt string) string {
fmt.Print(prompt)
// Common settings and variables for both stty calls.
attrs := syscall.ProcAttr{
Dir: "",
Env: []string{},
Files: []uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()},
Sys: nil}
var ws syscall.WaitStatus
// Disable echoing.
pid, err := syscall.ForkExec(
"/bin/stty",
[]string{"stty", "-echo"},
&attrs)
if err != nil {
panic(err)
}
// Wait for the stty process to complete.
_, err = syscall.Wait4(pid, &ws, 0, nil)
if err != nil {
panic(err)
}
// Echo is disabled, now grab the data.
reader := bufio.NewReader(os.Stdin)
text, err := reader.ReadString('\n')
if err != nil {
panic(err)
}
// Re-enable echo.
pid, err = syscall.ForkExec(
"/bin/stty",
[]string{"stty", "echo"},
&attrs)
if err != nil {
panic(err)
}
// Wait for the stty process to complete.
_, err = syscall.Wait4(pid, &ws, 0, nil)
if err != nil {
panic(err)
}
return strings.TrimSpace(text)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment