Skip to content

Instantly share code, notes, and snippets.

@kung-foo
Created August 19, 2019 17:25
Show Gist options
  • Save kung-foo/66317e4b274ec456a92270e32d692ff7 to your computer and use it in GitHub Desktop.
Save kung-foo/66317e4b274ec456a92270e32d692ff7 to your computer and use it in GitHub Desktop.
Badger test harness
package main
import (
"bytes"
"context"
"encoding/csv"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/dgraph-io/badger"
)
var wg sync.WaitGroup
const (
ttl = time.Minute * 15
gcInterval = time.Minute * 1
valueLogMaxEntries = 100000
valueSz = 256
)
func main() {
options := badger.DefaultOptions(filepath.Join(".", "store"))
options = options.WithSyncWrites(true)
options = options.WithValueLogMaxEntries(valueLogMaxEntries)
db, err := badger.Open(options)
if err != nil {
log.Fatal(err)
}
defer func() {
wg.Wait()
db.Close()
}()
ctx := cancelOnInterrupt(context.Background())
go runGC(ctx, db)
go runStats(ctx, db)
go runInsert(ctx, db)
<-ctx.Done()
}
func runStats(ctx context.Context, db *badger.DB) {
f, _ := os.Create("stats.csv")
defer f.Close()
statsCSV := csv.NewWriter(f)
statsCSV.Write([]string{
"ts",
"valid",
"interal",
"lsm",
"vlog",
})
count := func(prefix []byte) uint64 {
c := uint64(0)
if err := db.View(func(tx *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
opts.InternalAccess = bytes.HasPrefix(prefix, []byte("!badger!"))
it := tx.NewIterator(opts)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
c++
}
return nil
}); err != nil {
log.Fatal(err)
}
return c
}
globSz := func(glob string) int64 {
matches, _ := filepath.Glob(glob)
sz := int64(0)
for _, p := range matches {
fi, _ := os.Stat(p)
sz += fi.Size()
}
return sz
}
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
valid := count([]byte("key/"))
internal := count([]byte("!badger!"))
// log.Printf("valid=%d, internal=%d", valid, internal)
statsCSV.Write([]string{
t.Format("2006-01-02 15:04:05"),
strconv.Itoa(int(valid)),
strconv.Itoa(int(internal)),
strconv.Itoa(int(globSz("store/*.sst"))),
strconv.Itoa(int(globSz("store/*.vlog"))),
})
statsCSV.Flush()
}
}
}
func runInsert(ctx context.Context, db *badger.DB) {
seq, err := db.GetSequence([]byte("sequence"), 1000)
if err != nil {
log.Fatal(err)
}
defer seq.Release()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
// log.Printf("inserting...")
expireAt := uint64(t.Add(ttl).Unix())
v := make([]byte, valueSz)
if _, err := rand.Read(v); err != nil {
log.Fatal(err)
}
func() {
wb := db.NewWriteBatch()
defer wb.Cancel()
for i := 0; i <= 1000; i++ {
s, _ := seq.Next()
k := fmt.Sprintf("key/%08x", s)
e := badger.NewEntry([]byte(k), v)
e.ExpiresAt = expireAt
if err := wb.SetEntry(e); err != nil {
log.Fatal(err)
}
}
wb.Flush()
}()
}
}
}
func runGC(ctx context.Context, db *badger.DB) {
const ratio = 0.10
gc := func() {
wg.Add(1)
log.Printf("running gc...")
defer wg.Done()
for {
if err := db.RunValueLogGC(ratio); err != nil {
if err == badger.ErrNoRewrite {
return
} else {
log.Fatal(err)
}
}
}
}
gc()
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
gc()
}
}
}
func cancelOnInterrupt(ctx context.Context) context.Context {
ctx, cancel := context.WithCancel(ctx)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
select {
case <-ctx.Done():
case <-c:
println() // on Ctrl-C make it look a bit nicer
}
cancel()
}()
return ctx
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment