Skip to content

Instantly share code, notes, and snippets.

@ohurvitz
Created May 5, 2014 19:59
Show Gist options
  • Save ohurvitz/e5d74ae56d8ffa20e968 to your computer and use it in GitHub Desktop.
Save ohurvitz/e5d74ae56d8ffa20e968 to your computer and use it in GitHub Desktop.
Stress code for influxdb
package main
import (
"fmt"
"net/http"
"io/ioutil"
"bytes"
"encoding/json"
"flag"
"strings"
"net/url"
"sync"
"time"
"strconv"
)
type dbServer string
func (s dbServer) post(path string, data []byte) error {
url := fmt.Sprintf("http://%s%s?u=root&p=root", s, path)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("Server returned (%d): %s", resp.StatusCode, string(body))
}
func (s dbServer) postJson(path string, payload interface{}) error {
data, err := json.Marshal(payload)
if err != nil {
return err
}
return s.post(path, data)
}
func (s dbServer) delete(path string) error {
url := fmt.Sprintf("http://%s%s?u=root&p=root", s, path)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("Server returned (%d): %s", resp.StatusCode, string(body))
}
type dbResponse struct {
Name string
Columns []string
Points [][]int
}
func (s dbServer) query(path, query string) ([]*dbResponse, error) {
escapedQuery := url.QueryEscape(query)
url := fmt.Sprintf("http://%s%s?u=root&p=root&q=%s", s, path, escapedQuery)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("Server returned (%d): %s", resp.StatusCode, string(body))
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
series := []*dbResponse{}
err = json.Unmarshal(data, &series)
if err != nil {
return nil, err
}
return series, nil
}
func (s dbServer) createDatabase() error {
fmt.Println("deleting database")
s.delete("/db/ingest")
fmt.Println("creating database")
return s.postJson("/db", map[string]string{"name": "ingest"})
}
func pointJson(series string, time, data int) string {
format := `{"name":"%s","columns":["time","value"],"points":[[%d,%d]]}`
return fmt.Sprintf(format, series, time, data)
}
func writeData(db dbServer) error {
fmt.Println("writing data")
index := 0
batches := make(chan []string, numberOfSeries)
errors := make(chan error, numberOfSenders)
wg := sync.WaitGroup{}
for i := 0; i < numberOfSenders; i ++ {
wg.Add(1)
newDb := dbServer(strings.Replace(string(db), "1", strconv.Itoa(i%numberOfServers + 1), 1))
go func (db dbServer) {
defer wg.Done()
for {
batch := <- batches
if batch == nil {
return
}
message := "[" + strings.Join(batch, ",") + "]"
err := db.post("/db/ingest/series", []byte(message))
if err != nil {
errors <- err
fmt.Println(err)
return
}
// time.Sleep(1 * time.Millisecond)
}
}(newDb)
}
series := make([]string, 0, batchSize)
for p := 0; p < numberOfPoints; p++ {
for s:=0; s < numberOfSeries; s ++ {
name := fmt.Sprintf("series.%d", s)
series = append(series, pointJson(name, (timeBase + index) * 1000, index))
if cap(series) == len(series) {
batches <- series
series = make([]string, 0, batchSize)
}
}
if len(series) != 0 {
batches <- series
series = make([]string, 0, batchSize)
}
index ++
}
close(batches)
wg.Wait()
select {
case err := <- errors:
return err
default:
return nil
}
}
func verifyData(db dbServer) error {
fmt.Println("verifying data")
counter := 0
for d := 0; d < 10; d ++ {
query := fmt.Sprintf("select time, value from /series\\..*%d$/ order asc", d)
response, err := db.query("/db/ingest/series", query)
if err != nil {
return err
}
counter += len(response)
for _, series := range response {
column_map := map[string]int{}
for i, name := range series.Columns {
column_map[name] = i
}
time_index, ok := column_map["time"]
if !ok {
return fmt.Errorf("can't find time column")
}
value_index, ok := column_map["value"]
if !ok {
return fmt.Errorf("can't find value column")
}
if len(series.Points) != numberOfPoints {
return fmt.Errorf("Missing points: expected %d but got %d", numberOfPoints, series.Points)
}
for i := 0; i < numberOfPoints; i ++ {
t := series.Points[i][time_index]
v := series.Points[i][value_index]
if i != v {
return fmt.Errorf("Bad point match: expected %d but got %d", i, v)
}
expectedTime := (timeBase + i) * 1000
if expectedTime != t {
return fmt.Errorf("Bad time match: expected %d but got %d", expectedTime, t)
}
}
}
}
if counter != numberOfSeries {
return fmt.Errorf("Missing series in query, expected %d but got %d", numberOfSeries, counter)
}
return nil
}
const numberOfSeries = 25000
const numberOfPoints = 5
const timeBase = 1399035078 // 05/02/2014 @ 12:51pm
const batchSize = 5000
const numberOfSenders = 10
const numberOfServers = 4
func main() {
write_data := flag.Bool("write_data", false, "should we (re)create the database and write data")
server_connect := flag.String("server", "", "server to connect to (address:port)")
flag.Parse()
var db dbServer = dbServer(*server_connect)
if db == "" {
fmt.Println("missing database connection string")
return
}
if *write_data {
if err := db.createDatabase(); err != nil {
fmt.Println("can't create database:", err)
return
}
fmt.Println("Sleeping before write")
time.Sleep(10 * time.Second)
if err := writeData(db); err != nil {
fmt.Println("error writing series:", err)
return
}
fmt.Println("Sleeping before verify")
time.Sleep(10 * time.Second)
fmt.Println("waiting for server sync")
}
if err := verifyData(db); err != nil {
fmt.Println("error verifying series:", err)
return
}
fmt.Println("All well")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment