Skip to content

Instantly share code, notes, and snippets.

@traetox
Created June 8, 2017 03:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save traetox/0d9b482067780933f1df0856e3911904 to your computer and use it in GitHub Desktop.
Save traetox/0d9b482067780933f1df0856e3911904 to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"fmt"
"log"
"math/rand"
"os"
"path/filepath"
"sync"
"time"
"github.com/dgraph-io/badger/y"
"github.com/pkg/profile"
"github.com/traetox/goaio"
)
var (
dir = flag.String("dir", "datafiles", "File to read from")
fReadSize = flag.Int64("readsize", 4096, "Read buffer size")
numReads = flag.Int64("num", 2000000, "Number of reads")
mode = flag.Int("mode", 1, "0 = serial, 1 = parallel, 2 = parallel via channel, 3 parallel via AIO")
numGoroutines = flag.Int("jobs", 8, "Number of Goroutines")
profilemode = flag.String("profile.mode", "", "Enable profiling mode, one of [cpu, mem, mutex, block, trace]")
)
var readSize int64
func getIndices(r *rand.Rand, flist []*os.File, maxFileSize int64) (*os.File, int64) {
fidx := r.Intn(len(flist))
iidx := r.Int63n(maxFileSize - readSize)
return flist[fidx], iidx
}
func Serial(fList []*os.File, maxFileSize int64) {
startT := time.Now()
var i int64 = 0
b := make([]byte, int(readSize))
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for ; i < *numReads; i++ {
fd, offset := getIndices(r, fList, maxFileSize)
_, err := fd.ReadAt(b, offset)
if err != nil {
log.Fatalf("Error reading file: %v", err)
}
if i%10000 == 0 {
log.Printf("Finished %v reads in serial", i)
}
}
fmt.Println("Serial: Number of random reads per second: ",
float64(*numReads)/time.Since(startT).Seconds())
fmt.Println("Serial: Time Taken: ", time.Since(startT))
}
func Conc2(fList []*os.File, maxFileSize int64) {
startT := time.Now()
var wg sync.WaitGroup
countPerGo := *numReads / int64(*numGoroutines)
fmt.Printf("Concurrent mode: Reads per goroutine: %d\n", countPerGo)
for k := 0; k < *numGoroutines; k++ {
wg.Add(1)
go func() {
b := make([]byte, int(readSize))
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var i int64
for ; i < countPerGo; i++ {
fd, offset := getIndices(r, fList, maxFileSize)
_, err := fd.ReadAt(b, offset)
if err != nil {
log.Fatalf("Error reading file: %v", err)
}
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("Concurrent 2: Number of random reads per second: ",
float64(*numReads)/time.Since(startT).Seconds())
fmt.Println("Concurrent 2: Time Taken: ", time.Since(startT))
}
type req struct {
fd *os.File
offset int64
}
func Conc3(fList []*os.File, maxFileSize int64) {
ch := make(chan req, 10000)
go func() {
var i int64
rd := rand.New(rand.NewSource(time.Now().UnixNano()))
for i = 0; i < *numReads; i++ {
var r req
r.fd, r.offset = getIndices(rd, fList, maxFileSize)
ch <- r
}
close(ch)
}()
startT := time.Now()
var wg sync.WaitGroup
for k := 0; k < *numGoroutines; k++ {
wg.Add(1)
go func() {
b := make([]byte, int(readSize))
for req := range ch {
_, err := req.fd.ReadAt(b, req.offset)
if err != nil {
log.Fatalf("Error reading file: %v", err)
}
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("Concurrent 3: Number of random reads per second: ",
float64(*numReads)/time.Since(startT).Seconds())
fmt.Println("Concurrent 3: Time Taken: ", time.Since(startT))
}
type aioReq struct {
aio *goaio.AIO
offset int64
}
func getAIOIndices(r *rand.Rand, aios []*goaio.AIO, maxFileSize int64) (*goaio.AIO, int64) {
fidx := r.Intn(len(aios))
iidx := r.Int63n(maxFileSize - readSize)
return aios[fidx], iidx
}
func ConcAIO(fList []*os.File, maxFileSize int64) {
qdepth := int(16) //this is the primary tuning parameter, play with this a bit
aioCfg := goaio.AIOExtConfig{
QueueDepth: qdepth,
}
//build up the AIO list
aioList := make([]*goaio.AIO, len(fList))
for i := range fList {
aio, err := goaio.NewAIOExt(fList[i].Name(), aioCfg, os.O_RDONLY, 0755)
if err != nil {
fmt.Println("Failed to create new AIO for", fList[i].Name(), err)
return
}
aioList[i] = aio
}
ch := make(chan aioReq, 10000)
go func(aios []*goaio.AIO) {
var i int64
rd := rand.New(rand.NewSource(time.Now().UnixNano()))
for i = 0; i < *numReads; i++ {
var r aioReq
r.aio, r.offset = getAIOIndices(rd, aios, maxFileSize)
ch <- r
}
close(ch)
}(aioList)
//this method of issuing and collecting random reads doesn't really fit the model of the AIO
//but we are going to ram a square peg through a round hold for consistency
//the data coming back into the buffer is going to be inconsistent as we aren't tracking requests to
//buffers, but its just for benchmarking
startT := time.Now()
buff := make([]byte, readSize)
ids := make([]goaio.RequestId, qdepth)
for req := range ch {
if !req.aio.Ready() {
//collect finished requests
if _, err := req.aio.WaitAny(ids); err != nil {
log.Fatal("Failed to collect requests", err)
}
}
if _, err := req.aio.ReadAt(buff, req.offset); err != nil {
log.Fatalf("Error reading file: %v", err)
}
}
//collect all outstanding requests
for _, aio := range aioList {
for {
//keep collecting until we get a zero back
n, err := aio.WaitAny(ids)
if err != nil {
log.Fatal("Failed to collect requests", err)
}
if n == 0 {
break
}
}
}
fmt.Println("Concurrent AIO: Number of random reads per second: ",
float64(*numReads)/time.Since(startT).Seconds())
fmt.Println("Concurrent AIO: Time Taken: ", time.Since(startT))
}
func main() {
flag.Parse()
readSize = *fReadSize
if (readSize & 0xFFF) != 0 {
log.Fatal("Read size must be an even multiple of 4K")
}
var flist []*os.File
var maxFileSize int64
getFile := func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Print(err)
return nil
}
if !info.IsDir() {
f, err := os.Open(path)
y.AssertTruef(err == nil, "Error opening file: %v", path)
flist = append(flist, f)
log.Println("Opened file:", path, "Size:", info.Size()/(1<<20), "MB")
maxFileSize = info.Size()
}
return nil
}
err := filepath.Walk(*dir, getFile)
if err != nil {
log.Fatalf("%v", err)
}
if len(flist) == 0 {
log.Fatalf("Must have files already created")
}
switch *profilemode {
case "cpu":
defer profile.Start(profile.CPUProfile).Stop()
case "mem":
defer profile.Start(profile.MemProfile).Stop()
case "mutex":
defer profile.Start(profile.MutexProfile).Stop()
case "block":
defer profile.Start(profile.BlockProfile).Stop()
case "trace":
defer profile.Start(profile.TraceProfile).Stop()
}
switch *mode {
case 0:
Serial(flist, maxFileSize)
case 1:
Conc2(flist, maxFileSize)
case 2:
Conc3(flist, maxFileSize)
case 3:
ConcAIO(flist, maxFileSize)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment