Skip to content

Instantly share code, notes, and snippets.

@YuriyNasretdinov
Created October 23, 2021 18:51
Show Gist options
  • Save YuriyNasretdinov/be43cf200c2c3a160f13e3b5d784aa3c to your computer and use it in GitHub Desktop.
Save YuriyNasretdinov/be43cf200c2c3a160f13e3b5d784aa3c to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"log"
"time"
"sync"
"sync/atomic"
"github.com/valyala/fasthttp"
)
var (
kittenhouse = flag.Bool("kittenhouse", false, "Whether or not to send INSERTs in kittenhouse format")
clickhouse = flag.Bool("clickhouse-async", false, "Whether or not to use clickhouse-async")
clickhouseWait = flag.Bool("clickhouse-wait", false, "Whether or not to wait until INSERT has completed before sending a new request")
persistent = flag.Bool("persistent", false, "(for kittenhouse) Whether or not use persistent mode")
addr = flag.String("addr", "127.0.0.1:8124", "Address of bulk inserter")
requests = flag.Int("total-requests", 1000000, "total number of single requests to send")
concurrency = flag.Int("concurrency", 5000, "concurrency of the requests")
)
func main() {
flag.Parse()
table := `InsertTest_buffer(id)`
postURL := "http://" + (*addr) + "/?query=INSERT%20INTO%20" + table + "%20VALUES"
if *clickhouse {
waitParam := "wait_for_async_insert=0"
if *clickhouseWait {
waitParam = "wait_for_async_insert=1"
}
postURL = "http://" + (*addr) + "/?" + waitParam + "&async_insert=1&query=INSERT%20INTO%20" + table + "%20FORMAT%20TSV"
}
bodyStr := "(1)"
if *clickhouse {
bodyStr = "1\n"
}
body := []byte(bodyStr)
start := time.Now()
var wg sync.WaitGroup
cl := &fasthttp.Client{
MaxConnsPerHost: 50000,
}
requestsLeft := int32(*requests)
log.Printf("Sending %d total requests to %q with concurrency %d", *requests, postURL, *concurrency)
for j := 0; j < *concurrency; j++ {
wg.Add(1)
go func() {
for atomic.AddInt32(&requestsLeft, -1) >= 0 {
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
req.Reset()
resp.Reset()
req.SetBodyRaw(body)
req.SetRequestURI(postURL)
req.Header.SetMethod("POST")
err := cl.Do(req, resp)
if err != nil {
log.Fatalf("Request failed: %v", err)
}
if resp.StatusCode() != 200 {
log.Fatalf("ClickHouse responded with status code %d: %s", resp.StatusCode(), resp)
}
}
wg.Done()
}()
}
wg.Wait()
log.Printf("QPS: %.1f (for %s)", float64((*requests))/(float64(time.Since(start))/float64(time.Second)), time.Since(start))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment