Skip to content

Instantly share code, notes, and snippets.

@alram
Created March 24, 2023 17:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alram/ecfcf90035403fcf5a2dfff27daabf37 to your computer and use it in GitHub Desktop.
Save alram/ecfcf90035403fcf5a2dfff27daabf37 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"os"
"os/exec"
"regexp"
"strconv"
"sync"
"time"
"github.com/ceph/go-ceph/rados"
uuid "github.com/nu7hatch/gouuid"
"github.com/urfave/cli"
)
var (
data = make([]byte, 512)
)
var (
radosPoolFlag = cli.StringFlag{
Name: "pool",
Value: "omap-bench",
Usage: "RADOS pool. Tool is meant to be used on a pool with one PG if you use --do-ops-on-non-benched-obj",
}
radosObjFlag = cli.StringFlag{
Name: "object",
Value: "omapbench",
Usage: "RADOS object to run omap bench on",
}
concurrencyFlag = cli.Int64Flag{
Name: "limit",
Value: 3000,
Usage: "Maximum number of req/s",
}
insertWeightFlag = cli.Int64Flag{
Name: "insert-weight",
Value: 100,
Usage: "Weighted value for inserts against lists and deletes reqs",
}
listWeightFlag = cli.Int64Flag{
Name: "list-weight",
Value: 100,
Usage: "Weighted value for list against inserts and deletes reqs. Note that deletes do lists as well",
}
deleteWeightFlag = cli.Int64Flag{
Name: "delete-weight",
Value: 100,
Usage: "Weighted value for deletes against inserts and lists reqs",
}
randomOpsOnSecondaryObjFlag = cli.BoolFlag{
Name: "do-ops-on-non-benched-obj",
Usage: "This will randomly do ops on a random object to see how the bench is affecting neighbors. Only supported with 1 PG in pool",
}
)
func isNumPGOne(pool string) bool {
args := []string{
"docker", "exec", "ceph-tools",
"ceph", "osd", "pool",
"get", pool, "pg_num",
}
out, err := exec.Command(args[0], args[1:]...).Output()
if err != nil {
log.Fatalf(err.Error())
}
pgNumRegex := regexp.MustCompile(`pg_num: (\d+)`)
m := pgNumRegex.FindStringSubmatch(string(out))
if len(m) != 2 {
log.Fatalf("Could not determine how many PGs pool %s has\n", pool)
}
numPG, _ := strconv.Atoi(m[1])
if numPG != 1 {
return false
}
return true
}
func writeOmapData(ioctx *rados.IOContext, obj string, numWrites int64) int64 {
var i int64
start := time.Now()
m := make(map[string][]byte)
for i = 0; i < numWrites; i++ {
uuid, _ := uuid.NewV4()
m[uuid.String()] = data
err := ioctx.SetOmap(obj, m)
if err != nil {
log.Printf("Error while writting omap data: %s", err.Error())
}
}
return time.Since(start).Milliseconds()
}
func deleteOmapKeys(ioctx *rados.IOContext, obj string, numKeys int64) (int64, bool) {
if numKeys < 1 {
return 0, false
}
done := false
start := time.Now()
omapVals, err := ioctx.GetOmapValues(obj, "", "", numKeys)
if err != nil {
log.Fatalf("Couldn't get omap vals: %s", err.Error())
}
if int64(len(omapVals)) < numKeys {
done = true
}
keysToDel := make([]string, 0, len(omapVals))
for key := range omapVals {
keysToDel = append(keysToDel, key)
}
err = ioctx.RmOmapKeys(obj, keysToDel)
if err != nil {
log.Fatalf("Couldn't delete omap keys: %s", err.Error())
}
return time.Since(start).Milliseconds(), done
}
func listOmapKeys(ioctx *rados.IOContext, obj string, numKeys int64) int64 {
if numKeys < 1 {
return 0
}
var i int64
start := time.Now()
for i = 0; i < numKeys; i++ {
if _, err := ioctx.GetOmapValues(obj, "", "", numKeys); err != nil {
log.Fatalf("Couldn't list ")
}
}
return time.Since(start).Milliseconds()
}
func run(ctx *cli.Context, conn *rados.Conn) {
pool := ctx.String(radosPoolFlag.Name)
obj := ctx.String(radosObjFlag.Name)
benchRate := ctx.Int64(concurrencyFlag.Name)
weightMap := map[string]int64{
"insert": ctx.Int64(insertWeightFlag.Name),
"list": ctx.Int64(listWeightFlag.Name),
"delete": ctx.Int64(deleteWeightFlag.Name),
}
var total int64 = 0
for _, v := range weightMap {
total += v
}
insertRate := int64((float64(weightMap["insert"]) / float64(total)) * float64(benchRate))
listRate := int64((float64(weightMap["list"]) / float64(total)) * float64(benchRate))
deleteRate := int64((float64(weightMap["delete"]) / float64(total)) * float64(benchRate))
ioctx, err := conn.OpenIOContext(pool)
if err != nil {
log.Fatalf(err.Error())
}
if ctx.Bool(randomOpsOnSecondaryObjFlag.Name) {
if !isNumPGOne(pool) {
log.Fatalf("--do-ops-on-non-benched-obj is set but %s has more than 1 PG", pool)
}
}
log.Printf("Start benchmark on pool %s, object: %s\n", pool, obj)
log.Printf("Req distribution (max: %d/s): %d list, %d insert, %d delete", benchRate, listRate, insertRate, deleteRate)
fmt.Println("insert (ms), delete (ms), list (ms)")
for {
start := time.Now()
writeTime := make(chan int64)
listTime := make(chan int64)
deleteTime := make(chan int64)
done := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
t := writeOmapData(ioctx, obj, insertRate)
writeTime <- t
}()
wg.Add(1)
go func() {
defer wg.Done()
t := listOmapKeys(ioctx, obj, listRate)
listTime <- t
}()
wg.Add(1)
go func() {
defer wg.Done()
t, d := deleteOmapKeys(ioctx, obj, deleteRate)
deleteTime <- t
done <- d
}()
go func() {
wg.Wait()
close(writeTime)
close(listTime)
close(deleteTime)
close(done)
}()
if time.Since(start) < time.Second {
time.Sleep(time.Second - time.Since(start))
}
fmt.Printf("%10d, %11d, %9d\n", <-writeTime, <-deleteTime, <-listTime)
if <-done {
log.Println("No more keys to delete. Benchmark is done.")
break
}
}
}
func main() {
app := &cli.App{
Name: "omap-bench",
Usage: "(Eventually) flexible omap benchmarking tool",
Flags: []cli.Flag{
&radosPoolFlag,
&radosObjFlag,
&concurrencyFlag,
&insertWeightFlag,
&listWeightFlag,
&deleteWeightFlag,
&randomOpsOnSecondaryObjFlag,
},
Action: func(ctx *cli.Context) error {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
defer conn.Shutdown()
run(ctx, conn)
return nil
},
}
if err := app.Run(os.Args); err != nil {
log.Fatal(err.Error())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment