Skip to content

Instantly share code, notes, and snippets.

@h0hmj
Last active April 12, 2024 06:33
Show Gist options
  • Save h0hmj/ee07ba34762ed13ccf96ef4707233cdf to your computer and use it in GitHub Desktop.
Save h0hmj/ee07ba34762ed13ccf96ef4707233cdf to your computer and use it in GitHub Desktop.
rados bench 4MB io
package main
import (
"fmt"
"os"
"sort"
"time"
"github.com/Workiva/go-datastructures/queue"
"github.com/alecthomas/kong"
"github.com/ceph/go-ceph/rados"
"github.com/dustin/go-humanize"
)
const (
objSize = uint64(4 * 1024 * 1024)
)
type CommandEnum string
const (
CommandWrite CommandEnum = "write"
CommandRead CommandEnum = "read"
)
type CLI struct {
Config string `kong:"default='/etc/ceph/ceph.conf',short='c',help='Ceph Config file path.'"`
MonIp string `kong:"default='',short='m',help='Ceph monitor ip.'"`
Key string `kong:"default='',short='k',help='Ceph admin keyring in text string'"`
Pool string `kong:"required,short='p',help='Ceph pool name.'"`
Conn int `kong:"default=1,help='Number of rados connections.'"`
CtxPerConn int `kong:"default=1,help='Number of rados ioctx per connection.'"`
Parallel int `kong:"default=1,short='P',help='Number of processes.'"`
Prefix string `kong:"required,help='Object prefix. Object name will be prefix-index.'"`
Command CommandEnum `kong:"arg,required,enum='write,read',help='read or write'"`
}
var total = 0
func createIoctx(cli *CLI) (*rados.Conn, []*rados.IOContext) {
conn, err := rados.NewConn()
if err != nil {
fmt.Printf("Failed to create RADOS connection: %v\n", err)
os.Exit(1)
}
err = conn.ReadConfigFile(cli.Config)
if err != nil {
if cli.MonIp == "" || cli.Key == "" {
fmt.Printf("No ceph config file or monip/key: %v\n", err)
os.Exit(1)
}
}
if cli.MonIp != "" && cli.Key != "" {
err = conn.SetConfigOption("mon_host", cli.MonIp)
if err != nil {
fmt.Printf("Failed to set mon_host: %v\n", err)
os.Exit(1)
}
err = conn.SetConfigOption("key", cli.Key)
if err != nil {
fmt.Printf("Failed to set key: %v\n", err)
os.Exit(1)
}
}
err = conn.Connect()
if err != nil {
fmt.Printf("Failed to connect to RADOS cluster: %v\n", err)
os.Exit(1)
}
var ioctxs []*rados.IOContext
for i := 0; i < cli.CtxPerConn; i++ {
ioctx, err := conn.OpenIOContext(cli.Pool)
if err != nil {
fmt.Printf("Failed to open IO context for pool '%s': %v\n", cli.Pool, err)
os.Exit(1)
}
ioctxs = append(ioctxs, ioctx)
}
return conn, ioctxs
}
type Request struct {
Priority int
Type int
Duration time.Duration
}
func (r *Request) Compare(other queue.Item) int {
otherRequest := other.(*Request)
if r.Priority < otherRequest.Priority {
return -1
} else if r.Priority > otherRequest.Priority {
return 1
}
return 0
}
func handleMetricAdd(request *Request, latenciesMS *[]int64) {
*latenciesMS = append(*latenciesMS, request.Duration.Milliseconds())
}
func handleMetricPrint(request *Request, latenciesMS *[]int64) {
l := make([]int64, len(*latenciesMS))
copy(l, *latenciesMS)
*latenciesMS = []int64{}
go func() {
sort.Slice(l, func(i, j int) bool { return l[i] < l[j] })
count := len(l)
if count != 0 {
sum := int64(0)
for _, v := range l {
sum += v
}
latAvg := float64(sum) / float64(count)
latP95 := l[int(0.95*float64(len(l)-1))]
bwByte := objSize * uint64(count)
total += count
fmt.Printf("%s, bw: %s/s, lat_avg: %d ms, lat_p95: %d ms, iops: %d/s, index: %d\n", time.Now().Format("2006-01-02 15:04:05"), humanize.Bytes(bwByte), int(latAvg), latP95, count, total)
}
}()
}
func handleRequest(pq *queue.PriorityQueue, latenciesMS *[]int64) {
for {
items, _ := pq.Get(1)
request := items[0].(*Request)
switch request.Type {
case 0:
handleMetricAdd(request, latenciesMS)
case 1:
handleMetricPrint(request, latenciesMS)
}
}
}
func run(cli *CLI) {
var latenciesMS []int64
pq := queue.NewPriorityQueue(1, false)
go handleRequest(pq, &latenciesMS)
ticker := time.NewTicker(time.Second)
go func() {
for {
<-ticker.C
go pq.Put(&Request{Priority: 1, Type: 1})
}
}()
var ioctxs []*rados.IOContext
for i := 0; i < cli.Conn; i++ {
conn, newIoctxs := createIoctx(cli)
defer conn.Shutdown()
ioctxs = append(ioctxs, newIoctxs...)
}
for i := range ioctxs {
defer ioctxs[i].Destroy()
}
ch := make(chan struct{}, cli.Parallel)
buf := make([]byte, objSize)
for {
// fixed object number 30w
for i := 0; i < 300000; i++ {
ch <- struct{}{}
go func(i int) {
objectName := fmt.Sprintf("%s-%d", cli.Prefix, i)
var ioctx *rados.IOContext
ioctx = ioctxs[i%(cli.Conn*cli.CtxPerConn)]
start := time.Now()
var err error
if cli.Command == CommandRead {
_, err = ioctx.Read(objectName, buf, 0)
} else if cli.Command == CommandWrite {
err = ioctx.WriteFull(objectName, buf)
}
latency := time.Since(start)
go pq.Put(&Request{Priority: 0, Type: 0, Duration: latency})
if err != nil {
fmt.Printf("Failed to write object '%s': %v\n", objectName, err)
} else {
// fmt.Printf("Wrote object '%s'\n", objectName)
}
<-ch
}(i)
}
}
}
func main() {
var cli CLI
ctx := kong.Parse(&cli,
kong.Name("bench"),
kong.Description("A benchmarking tool for 4MiB ceph rados object read/write throughput."),
kong.UsageOnError(),
)
switch cli.Command {
case CommandWrite:
fallthrough
case CommandRead:
run(&cli)
default:
fmt.Println("Unknown command:", cli.Command)
ctx.PrintUsage(false)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment