Created
June 8, 2017 03:23
-
-
Save traetox/0d9b482067780933f1df0856e3911904 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"log" | |
"math/rand" | |
"os" | |
"path/filepath" | |
"sync" | |
"time" | |
"github.com/dgraph-io/badger/y" | |
"github.com/pkg/profile" | |
"github.com/traetox/goaio" | |
) | |
var ( | |
dir = flag.String("dir", "datafiles", "File to read from") | |
fReadSize = flag.Int64("readsize", 4096, "Read buffer size") | |
numReads = flag.Int64("num", 2000000, "Number of reads") | |
mode = flag.Int("mode", 1, "0 = serial, 1 = parallel, 2 = parallel via channel, 3 parallel via AIO") | |
numGoroutines = flag.Int("jobs", 8, "Number of Goroutines") | |
profilemode = flag.String("profile.mode", "", "Enable profiling mode, one of [cpu, mem, mutex, block, trace]") | |
) | |
var readSize int64 | |
func getIndices(r *rand.Rand, flist []*os.File, maxFileSize int64) (*os.File, int64) { | |
fidx := r.Intn(len(flist)) | |
iidx := r.Int63n(maxFileSize - readSize) | |
return flist[fidx], iidx | |
} | |
func Serial(fList []*os.File, maxFileSize int64) { | |
startT := time.Now() | |
var i int64 = 0 | |
b := make([]byte, int(readSize)) | |
r := rand.New(rand.NewSource(time.Now().UnixNano())) | |
for ; i < *numReads; i++ { | |
fd, offset := getIndices(r, fList, maxFileSize) | |
_, err := fd.ReadAt(b, offset) | |
if err != nil { | |
log.Fatalf("Error reading file: %v", err) | |
} | |
if i%10000 == 0 { | |
log.Printf("Finished %v reads in serial", i) | |
} | |
} | |
fmt.Println("Serial: Number of random reads per second: ", | |
float64(*numReads)/time.Since(startT).Seconds()) | |
fmt.Println("Serial: Time Taken: ", time.Since(startT)) | |
} | |
func Conc2(fList []*os.File, maxFileSize int64) { | |
startT := time.Now() | |
var wg sync.WaitGroup | |
countPerGo := *numReads / int64(*numGoroutines) | |
fmt.Printf("Concurrent mode: Reads per goroutine: %d\n", countPerGo) | |
for k := 0; k < *numGoroutines; k++ { | |
wg.Add(1) | |
go func() { | |
b := make([]byte, int(readSize)) | |
r := rand.New(rand.NewSource(time.Now().UnixNano())) | |
var i int64 | |
for ; i < countPerGo; i++ { | |
fd, offset := getIndices(r, fList, maxFileSize) | |
_, err := fd.ReadAt(b, offset) | |
if err != nil { | |
log.Fatalf("Error reading file: %v", err) | |
} | |
} | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
fmt.Println("Concurrent 2: Number of random reads per second: ", | |
float64(*numReads)/time.Since(startT).Seconds()) | |
fmt.Println("Concurrent 2: Time Taken: ", time.Since(startT)) | |
} | |
type req struct { | |
fd *os.File | |
offset int64 | |
} | |
func Conc3(fList []*os.File, maxFileSize int64) { | |
ch := make(chan req, 10000) | |
go func() { | |
var i int64 | |
rd := rand.New(rand.NewSource(time.Now().UnixNano())) | |
for i = 0; i < *numReads; i++ { | |
var r req | |
r.fd, r.offset = getIndices(rd, fList, maxFileSize) | |
ch <- r | |
} | |
close(ch) | |
}() | |
startT := time.Now() | |
var wg sync.WaitGroup | |
for k := 0; k < *numGoroutines; k++ { | |
wg.Add(1) | |
go func() { | |
b := make([]byte, int(readSize)) | |
for req := range ch { | |
_, err := req.fd.ReadAt(b, req.offset) | |
if err != nil { | |
log.Fatalf("Error reading file: %v", err) | |
} | |
} | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
fmt.Println("Concurrent 3: Number of random reads per second: ", | |
float64(*numReads)/time.Since(startT).Seconds()) | |
fmt.Println("Concurrent 3: Time Taken: ", time.Since(startT)) | |
} | |
type aioReq struct { | |
aio *goaio.AIO | |
offset int64 | |
} | |
func getAIOIndices(r *rand.Rand, aios []*goaio.AIO, maxFileSize int64) (*goaio.AIO, int64) { | |
fidx := r.Intn(len(aios)) | |
iidx := r.Int63n(maxFileSize - readSize) | |
return aios[fidx], iidx | |
} | |
func ConcAIO(fList []*os.File, maxFileSize int64) { | |
qdepth := int(16) //this is the primary tuning parameter, play with this a bit | |
aioCfg := goaio.AIOExtConfig{ | |
QueueDepth: qdepth, | |
} | |
//build up the AIO list | |
aioList := make([]*goaio.AIO, len(fList)) | |
for i := range fList { | |
aio, err := goaio.NewAIOExt(fList[i].Name(), aioCfg, os.O_RDONLY, 0755) | |
if err != nil { | |
fmt.Println("Failed to create new AIO for", fList[i].Name(), err) | |
return | |
} | |
aioList[i] = aio | |
} | |
ch := make(chan aioReq, 10000) | |
go func(aios []*goaio.AIO) { | |
var i int64 | |
rd := rand.New(rand.NewSource(time.Now().UnixNano())) | |
for i = 0; i < *numReads; i++ { | |
var r aioReq | |
r.aio, r.offset = getAIOIndices(rd, aios, maxFileSize) | |
ch <- r | |
} | |
close(ch) | |
}(aioList) | |
//this method of issuing and collecting random reads doesn't really fit the model of the AIO | |
//but we are going to ram a square peg through a round hold for consistency | |
//the data coming back into the buffer is going to be inconsistent as we aren't tracking requests to | |
//buffers, but its just for benchmarking | |
startT := time.Now() | |
buff := make([]byte, readSize) | |
ids := make([]goaio.RequestId, qdepth) | |
for req := range ch { | |
if !req.aio.Ready() { | |
//collect finished requests | |
if _, err := req.aio.WaitAny(ids); err != nil { | |
log.Fatal("Failed to collect requests", err) | |
} | |
} | |
if _, err := req.aio.ReadAt(buff, req.offset); err != nil { | |
log.Fatalf("Error reading file: %v", err) | |
} | |
} | |
//collect all outstanding requests | |
for _, aio := range aioList { | |
for { | |
//keep collecting until we get a zero back | |
n, err := aio.WaitAny(ids) | |
if err != nil { | |
log.Fatal("Failed to collect requests", err) | |
} | |
if n == 0 { | |
break | |
} | |
} | |
} | |
fmt.Println("Concurrent AIO: Number of random reads per second: ", | |
float64(*numReads)/time.Since(startT).Seconds()) | |
fmt.Println("Concurrent AIO: Time Taken: ", time.Since(startT)) | |
} | |
func main() { | |
flag.Parse() | |
readSize = *fReadSize | |
if (readSize & 0xFFF) != 0 { | |
log.Fatal("Read size must be an even multiple of 4K") | |
} | |
var flist []*os.File | |
var maxFileSize int64 | |
getFile := func(path string, info os.FileInfo, err error) error { | |
if err != nil { | |
log.Print(err) | |
return nil | |
} | |
if !info.IsDir() { | |
f, err := os.Open(path) | |
y.AssertTruef(err == nil, "Error opening file: %v", path) | |
flist = append(flist, f) | |
log.Println("Opened file:", path, "Size:", info.Size()/(1<<20), "MB") | |
maxFileSize = info.Size() | |
} | |
return nil | |
} | |
err := filepath.Walk(*dir, getFile) | |
if err != nil { | |
log.Fatalf("%v", err) | |
} | |
if len(flist) == 0 { | |
log.Fatalf("Must have files already created") | |
} | |
switch *profilemode { | |
case "cpu": | |
defer profile.Start(profile.CPUProfile).Stop() | |
case "mem": | |
defer profile.Start(profile.MemProfile).Stop() | |
case "mutex": | |
defer profile.Start(profile.MutexProfile).Stop() | |
case "block": | |
defer profile.Start(profile.BlockProfile).Stop() | |
case "trace": | |
defer profile.Start(profile.TraceProfile).Stop() | |
} | |
switch *mode { | |
case 0: | |
Serial(flist, maxFileSize) | |
case 1: | |
Conc2(flist, maxFileSize) | |
case 2: | |
Conc3(flist, maxFileSize) | |
case 3: | |
ConcAIO(flist, maxFileSize) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment