Last active
April 12, 2024 06:33
-
-
Save h0hmj/ee07ba34762ed13ccf96ef4707233cdf to your computer and use it in GitHub Desktop.
rados bench 4MB io
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"sort" | |
"time" | |
"github.com/Workiva/go-datastructures/queue" | |
"github.com/alecthomas/kong" | |
"github.com/ceph/go-ceph/rados" | |
"github.com/dustin/go-humanize" | |
) | |
const ( | |
objSize = uint64(4 * 1024 * 1024) | |
) | |
type CommandEnum string | |
const ( | |
CommandWrite CommandEnum = "write" | |
CommandRead CommandEnum = "read" | |
) | |
type CLI struct { | |
Config string `kong:"default='/etc/ceph/ceph.conf',short='c',help='Ceph Config file path.'"` | |
MonIp string `kong:"default='',short='m',help='Ceph monitor ip.'"` | |
Key string `kong:"default='',short='k',help='Ceph admin keyring in text string'"` | |
Pool string `kong:"required,short='p',help='Ceph pool name.'"` | |
Conn int `kong:"default=1,help='Number of rados connections.'"` | |
CtxPerConn int `kong:"default=1,help='Number of rados ioctx per connection.'"` | |
Parallel int `kong:"default=1,short='P',help='Number of processes.'"` | |
Prefix string `kong:"required,help='Object prefix. Object name will be prefix-index.'"` | |
Command CommandEnum `kong:"arg,required,enum='write,read',help='read or write'"` | |
} | |
var total = 0 | |
func createIoctx(cli *CLI) (*rados.Conn, []*rados.IOContext) { | |
conn, err := rados.NewConn() | |
if err != nil { | |
fmt.Printf("Failed to create RADOS connection: %v\n", err) | |
os.Exit(1) | |
} | |
err = conn.ReadConfigFile(cli.Config) | |
if err != nil { | |
if cli.MonIp == "" || cli.Key == "" { | |
fmt.Printf("No ceph config file or monip/key: %v\n", err) | |
os.Exit(1) | |
} | |
} | |
if cli.MonIp != "" && cli.Key != "" { | |
err = conn.SetConfigOption("mon_host", cli.MonIp) | |
if err != nil { | |
fmt.Printf("Failed to set mon_host: %v\n", err) | |
os.Exit(1) | |
} | |
err = conn.SetConfigOption("key", cli.Key) | |
if err != nil { | |
fmt.Printf("Failed to set key: %v\n", err) | |
os.Exit(1) | |
} | |
} | |
err = conn.Connect() | |
if err != nil { | |
fmt.Printf("Failed to connect to RADOS cluster: %v\n", err) | |
os.Exit(1) | |
} | |
var ioctxs []*rados.IOContext | |
for i := 0; i < cli.CtxPerConn; i++ { | |
ioctx, err := conn.OpenIOContext(cli.Pool) | |
if err != nil { | |
fmt.Printf("Failed to open IO context for pool '%s': %v\n", cli.Pool, err) | |
os.Exit(1) | |
} | |
ioctxs = append(ioctxs, ioctx) | |
} | |
return conn, ioctxs | |
} | |
type Request struct { | |
Priority int | |
Type int | |
Duration time.Duration | |
} | |
func (r *Request) Compare(other queue.Item) int { | |
otherRequest := other.(*Request) | |
if r.Priority < otherRequest.Priority { | |
return -1 | |
} else if r.Priority > otherRequest.Priority { | |
return 1 | |
} | |
return 0 | |
} | |
func handleMetricAdd(request *Request, latenciesMS *[]int64) { | |
*latenciesMS = append(*latenciesMS, request.Duration.Milliseconds()) | |
} | |
func handleMetricPrint(request *Request, latenciesMS *[]int64) { | |
l := make([]int64, len(*latenciesMS)) | |
copy(l, *latenciesMS) | |
*latenciesMS = []int64{} | |
go func() { | |
sort.Slice(l, func(i, j int) bool { return l[i] < l[j] }) | |
count := len(l) | |
if count != 0 { | |
sum := int64(0) | |
for _, v := range l { | |
sum += v | |
} | |
latAvg := float64(sum) / float64(count) | |
latP95 := l[int(0.95*float64(len(l)-1))] | |
bwByte := objSize * uint64(count) | |
total += count | |
fmt.Printf("%s, bw: %s/s, lat_avg: %d ms, lat_p95: %d ms, iops: %d/s, index: %d\n", time.Now().Format("2006-01-02 15:04:05"), humanize.Bytes(bwByte), int(latAvg), latP95, count, total) | |
} | |
}() | |
} | |
func handleRequest(pq *queue.PriorityQueue, latenciesMS *[]int64) { | |
for { | |
items, _ := pq.Get(1) | |
request := items[0].(*Request) | |
switch request.Type { | |
case 0: | |
handleMetricAdd(request, latenciesMS) | |
case 1: | |
handleMetricPrint(request, latenciesMS) | |
} | |
} | |
} | |
func run(cli *CLI) { | |
var latenciesMS []int64 | |
pq := queue.NewPriorityQueue(1, false) | |
go handleRequest(pq, &latenciesMS) | |
ticker := time.NewTicker(time.Second) | |
go func() { | |
for { | |
<-ticker.C | |
go pq.Put(&Request{Priority: 1, Type: 1}) | |
} | |
}() | |
var ioctxs []*rados.IOContext | |
for i := 0; i < cli.Conn; i++ { | |
conn, newIoctxs := createIoctx(cli) | |
defer conn.Shutdown() | |
ioctxs = append(ioctxs, newIoctxs...) | |
} | |
for i := range ioctxs { | |
defer ioctxs[i].Destroy() | |
} | |
ch := make(chan struct{}, cli.Parallel) | |
buf := make([]byte, objSize) | |
for { | |
// fixed object number 30w | |
for i := 0; i < 300000; i++ { | |
ch <- struct{}{} | |
go func(i int) { | |
objectName := fmt.Sprintf("%s-%d", cli.Prefix, i) | |
var ioctx *rados.IOContext | |
ioctx = ioctxs[i%(cli.Conn*cli.CtxPerConn)] | |
start := time.Now() | |
var err error | |
if cli.Command == CommandRead { | |
_, err = ioctx.Read(objectName, buf, 0) | |
} else if cli.Command == CommandWrite { | |
err = ioctx.WriteFull(objectName, buf) | |
} | |
latency := time.Since(start) | |
go pq.Put(&Request{Priority: 0, Type: 0, Duration: latency}) | |
if err != nil { | |
fmt.Printf("Failed to write object '%s': %v\n", objectName, err) | |
} else { | |
// fmt.Printf("Wrote object '%s'\n", objectName) | |
} | |
<-ch | |
}(i) | |
} | |
} | |
} | |
func main() { | |
var cli CLI | |
ctx := kong.Parse(&cli, | |
kong.Name("bench"), | |
kong.Description("A benchmarking tool for 4MiB ceph rados object read/write throughput."), | |
kong.UsageOnError(), | |
) | |
switch cli.Command { | |
case CommandWrite: | |
fallthrough | |
case CommandRead: | |
run(&cli) | |
default: | |
fmt.Println("Unknown command:", cli.Command) | |
ctx.PrintUsage(false) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment