Skip to content

Instantly share code, notes, and snippets.

@YangKeao
Created May 11, 2024 07:48
Show Gist options
  • Save YangKeao/c02ada99d2602d5e7fb5fb94f0581015 to your computer and use it in GitHub Desktop.
Save YangKeao/c02ada99d2602d5e7fb5fb94f0581015 to your computer and use it in GitHub Desktop.
A simple `select_random_points` benchmark.
package main
import (
"bufio"
"context"
"database/sql"
"encoding/binary"
"flag"
"fmt"
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"time"
_ "github.com/go-sql-driver/mysql"
)
var tableSize *int
var threads = 100
var paramCount = 10
var duration = time.Minute * 30
var reportDuration = time.Second * 30
func main() {
dsns := flag.String("dsn", "username:password@/dbname", "The database DSN")
tableSize = flag.Int("table-size", 10000000, "The size of the table")
flag.Parse()
var dbs []*sql.DB
for _, dsn := range strings.Split(*dsns, ",") {
db, err := sql.Open("mysql", dsn)
if err != nil {
panic(err)
}
dbs = append(dbs, db)
}
var wg1 sync.WaitGroup
startBenchTime := time.Now()
latencyFile, err := os.OpenFile("latency", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
panic(err)
}
latencyFileWriter := bufio.NewWriter(latencyFile)
latencyCh := make(chan time.Duration, 100)
wg1.Add(1)
go func() {
defer wg1.Done()
latencyBytes := []byte{}
for latency := range latencyCh {
buf := binary.LittleEndian.AppendUint64(latencyBytes, uint64(latency.Nanoseconds()))
n, err := latencyFileWriter.Write(buf)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
if n != len(buf) {
fmt.Printf("Error: short write\n")
return
}
}
err := latencyFileWriter.Flush()
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
}()
var wg2 sync.WaitGroup
wg2.Add(1)
var queryCount atomic.Uint64
go func() {
defer wg2.Done()
for {
time.Sleep(reportDuration)
oldVal := queryCount.Swap(0)
fmt.Printf("QPS: %d\n", oldVal/(uint64(reportDuration)/uint64(time.Second)))
if time.Since(startBenchTime) > duration {
break
}
}
}()
startTime := time.Now()
rangeLen := *tableSize / threads
for i := 0; i < threads; i++ {
wg2.Add(1)
db := dbs[i%len(dbs)]
conn, err := db.Conn(context.Background())
if err != nil {
fmt.Printf("Thread %d: error: %v\n", i, err)
return
}
stmt, err := conn.PrepareContext(context.Background(), "SELECT id, k, c, pad FROM sbtest1 WHERE k IN (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
fmt.Printf("Thread %d: error: %v\n", i, err)
return
}
params := make([]any, paramCount)
go func(i int) {
defer wg2.Done()
for {
start := time.Now()
for j := 0; j < paramCount; j++ {
params[j] = (rand.Int() % rangeLen) + rangeLen*i
}
rows, err := stmt.Query(params...)
if err != nil {
fmt.Printf("Thread %d: error: %v\n", i, err)
return
}
for rows.Next() {
}
err = rows.Close()
if err != nil {
fmt.Printf("Thread %d: error: %v\n", i, err)
return
}
end := time.Now()
if end.Sub(startTime) > duration {
fmt.Printf("Thread %d: done\n", i)
break
}
latencyCh <- end.Sub(start)
queryCount.Add(1)
}
}(i)
}
wg2.Wait()
close(latencyCh)
wg1.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment