Created
May 5, 2014 19:59
-
-
Save ohurvitz/e5d74ae56d8ffa20e968 to your computer and use it in GitHub Desktop.
Stress code for influxdb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"net/http" | |
"io/ioutil" | |
"bytes" | |
"encoding/json" | |
"flag" | |
"strings" | |
"net/url" | |
"sync" | |
"time" | |
"strconv" | |
) | |
type dbServer string | |
func (s dbServer) post(path string, data []byte) error { | |
url := fmt.Sprintf("http://%s%s?u=root&p=root", s, path) | |
resp, err := http.Post(url, "application/json", bytes.NewBuffer(data)) | |
if err != nil { | |
return err | |
} | |
defer resp.Body.Close() | |
if resp.StatusCode >= 200 && resp.StatusCode < 300 { | |
return nil | |
} | |
body, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return err | |
} | |
return fmt.Errorf("Server returned (%d): %s", resp.StatusCode, string(body)) | |
} | |
func (s dbServer) postJson(path string, payload interface{}) error { | |
data, err := json.Marshal(payload) | |
if err != nil { | |
return err | |
} | |
return s.post(path, data) | |
} | |
func (s dbServer) delete(path string) error { | |
url := fmt.Sprintf("http://%s%s?u=root&p=root", s, path) | |
req, err := http.NewRequest("DELETE", url, nil) | |
if err != nil { | |
return err | |
} | |
resp, err := http.DefaultClient.Do(req) | |
if err != nil { | |
return err | |
} | |
defer resp.Body.Close() | |
if resp.StatusCode >= 200 && resp.StatusCode < 300 { | |
return nil | |
} | |
body, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return err | |
} | |
return fmt.Errorf("Server returned (%d): %s", resp.StatusCode, string(body)) | |
} | |
type dbResponse struct { | |
Name string | |
Columns []string | |
Points [][]int | |
} | |
func (s dbServer) query(path, query string) ([]*dbResponse, error) { | |
escapedQuery := url.QueryEscape(query) | |
url := fmt.Sprintf("http://%s%s?u=root&p=root&q=%s", s, path, escapedQuery) | |
resp, err := http.Get(url) | |
if err != nil { | |
return nil, err | |
} | |
defer resp.Body.Close() | |
if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { | |
body, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return nil, err | |
} | |
return nil, fmt.Errorf("Server returned (%d): %s", resp.StatusCode, string(body)) | |
} | |
data, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return nil, err | |
} | |
series := []*dbResponse{} | |
err = json.Unmarshal(data, &series) | |
if err != nil { | |
return nil, err | |
} | |
return series, nil | |
} | |
func (s dbServer) createDatabase() error { | |
fmt.Println("deleting database") | |
s.delete("/db/ingest") | |
fmt.Println("creating database") | |
return s.postJson("/db", map[string]string{"name": "ingest"}) | |
} | |
func pointJson(series string, time, data int) string { | |
format := `{"name":"%s","columns":["time","value"],"points":[[%d,%d]]}` | |
return fmt.Sprintf(format, series, time, data) | |
} | |
func writeData(db dbServer) error { | |
fmt.Println("writing data") | |
index := 0 | |
batches := make(chan []string, numberOfSeries) | |
errors := make(chan error, numberOfSenders) | |
wg := sync.WaitGroup{} | |
for i := 0; i < numberOfSenders; i ++ { | |
wg.Add(1) | |
newDb := dbServer(strings.Replace(string(db), "1", strconv.Itoa(i%numberOfServers + 1), 1)) | |
go func (db dbServer) { | |
defer wg.Done() | |
for { | |
batch := <- batches | |
if batch == nil { | |
return | |
} | |
message := "[" + strings.Join(batch, ",") + "]" | |
err := db.post("/db/ingest/series", []byte(message)) | |
if err != nil { | |
errors <- err | |
fmt.Println(err) | |
return | |
} | |
// time.Sleep(1 * time.Millisecond) | |
} | |
}(newDb) | |
} | |
series := make([]string, 0, batchSize) | |
for p := 0; p < numberOfPoints; p++ { | |
for s:=0; s < numberOfSeries; s ++ { | |
name := fmt.Sprintf("series.%d", s) | |
series = append(series, pointJson(name, (timeBase + index) * 1000, index)) | |
if cap(series) == len(series) { | |
batches <- series | |
series = make([]string, 0, batchSize) | |
} | |
} | |
if len(series) != 0 { | |
batches <- series | |
series = make([]string, 0, batchSize) | |
} | |
index ++ | |
} | |
close(batches) | |
wg.Wait() | |
select { | |
case err := <- errors: | |
return err | |
default: | |
return nil | |
} | |
} | |
func verifyData(db dbServer) error { | |
fmt.Println("verifying data") | |
counter := 0 | |
for d := 0; d < 10; d ++ { | |
query := fmt.Sprintf("select time, value from /series\\..*%d$/ order asc", d) | |
response, err := db.query("/db/ingest/series", query) | |
if err != nil { | |
return err | |
} | |
counter += len(response) | |
for _, series := range response { | |
column_map := map[string]int{} | |
for i, name := range series.Columns { | |
column_map[name] = i | |
} | |
time_index, ok := column_map["time"] | |
if !ok { | |
return fmt.Errorf("can't find time column") | |
} | |
value_index, ok := column_map["value"] | |
if !ok { | |
return fmt.Errorf("can't find value column") | |
} | |
if len(series.Points) != numberOfPoints { | |
return fmt.Errorf("Missing points: expected %d but got %d", numberOfPoints, series.Points) | |
} | |
for i := 0; i < numberOfPoints; i ++ { | |
t := series.Points[i][time_index] | |
v := series.Points[i][value_index] | |
if i != v { | |
return fmt.Errorf("Bad point match: expected %d but got %d", i, v) | |
} | |
expectedTime := (timeBase + i) * 1000 | |
if expectedTime != t { | |
return fmt.Errorf("Bad time match: expected %d but got %d", expectedTime, t) | |
} | |
} | |
} | |
} | |
if counter != numberOfSeries { | |
return fmt.Errorf("Missing series in query, expected %d but got %d", numberOfSeries, counter) | |
} | |
return nil | |
} | |
const numberOfSeries = 25000 | |
const numberOfPoints = 5 | |
const timeBase = 1399035078 // 05/02/2014 @ 12:51pm | |
const batchSize = 5000 | |
const numberOfSenders = 10 | |
const numberOfServers = 4 | |
func main() { | |
write_data := flag.Bool("write_data", false, "should we (re)create the database and write data") | |
server_connect := flag.String("server", "", "server to connect to (address:port)") | |
flag.Parse() | |
var db dbServer = dbServer(*server_connect) | |
if db == "" { | |
fmt.Println("missing database connection string") | |
return | |
} | |
if *write_data { | |
if err := db.createDatabase(); err != nil { | |
fmt.Println("can't create database:", err) | |
return | |
} | |
fmt.Println("Sleeping before write") | |
time.Sleep(10 * time.Second) | |
if err := writeData(db); err != nil { | |
fmt.Println("error writing series:", err) | |
return | |
} | |
fmt.Println("Sleeping before verify") | |
time.Sleep(10 * time.Second) | |
fmt.Println("waiting for server sync") | |
} | |
if err := verifyData(db); err != nil { | |
fmt.Println("error verifying series:", err) | |
return | |
} | |
fmt.Println("All well") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment