Created
May 1, 2018 03:34
-
-
Save manishrjain/137b9849f2672a5fc3a106815d9d41d7 to your computer and use it in GitHub Desktop.
Test Badger with TTL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/binary" | |
"encoding/json" | |
"fmt" | |
"net/http" | |
_ "net/http/pprof" | |
"os" | |
"os/signal" | |
"path/filepath" | |
"time" | |
"github.com/Sirupsen/logrus" | |
"github.com/dgraph-io/badger" | |
"github.com/dgraph-io/badger/options" | |
"github.com/dgraph-io/badger/y" | |
) | |
type writable struct { | |
key []byte | |
value []byte | |
} | |
var ( | |
// store the first key we've seen so we can try and query it to see when it's expired from view | |
firstKey []byte | |
firstKeyStored bool // | |
stopWriting = false | |
) | |
func main() { | |
go func() { | |
// you can hit: | |
// go tool pprof http://localhost:8001/debug/pprof/heap | |
// go tool pprof http://localhost:8001/debug/pprof/profile | |
logrus.Infof("starting debug web server....") | |
logrus.Info(http.ListenAndServe("localhost:8001", nil)) | |
}() | |
done := make(chan struct{}) | |
bt := NewBadgerTest() | |
bt.Start() | |
<-done // wait | |
} | |
type BadgerTest struct { | |
db *badger.DB | |
} | |
func NewBadgerTest() *BadgerTest { | |
dir := "/home/mrjn/badgertest" | |
opts := badger.DefaultOptions | |
opts.Dir = dir | |
opts.ValueDir = dir | |
opts.ValueLogLoadingMode = options.FileIO | |
opts.TableLoadingMode = options.FileIO | |
// opts.NumCompactors = 20 | |
// opts.MaxTableSize = .25 * 1073741824 // .25GB | |
opts.NumMemtables = 2 | |
opts.ValueLogFileSize = 1 * 1073741824 // 2GB | |
opts.SyncWrites = false | |
bytes, _ := json.Marshal(&opts) | |
logrus.Infof("BADGER OPTIONS=%s", string(bytes)) | |
db, err := badger.Open(opts) | |
if err != nil { | |
panic(fmt.Sprintf("unable to open badger db; %s", err)) | |
} | |
bt := &BadgerTest{ | |
db: db, | |
} | |
go bt.filecounts(dir) | |
return bt | |
} | |
func (b *BadgerTest) Start() { | |
a := y.NewCloser(3) | |
c := make(chan os.Signal, 1) | |
signal.Notify(c, os.Interrupt) | |
go func() { | |
for range c { | |
fmt.Println("Signaling...") | |
a.SignalAndWait() | |
fmt.Printf("Closing, error: %v\n", b.db.Close()) | |
os.Exit(0) | |
} | |
}() | |
workers := 1 | |
for i := 0; i < workers; i++ { | |
go b.write(a) | |
} | |
//go b.write() | |
go b.badgerGC(a) | |
go func() { | |
defer a.Done() | |
tick := time.NewTicker(1 * time.Minute) | |
for { | |
select { | |
case <-tick.C: | |
b.readKey() | |
case <-a.HasBeenClosed(): | |
fmt.Println("Returning from readKey") | |
return | |
} | |
} | |
}() | |
//go func() { | |
// tick := time.NewTicker(15 * time.Minute) | |
// for _ = range tick.C { | |
// stopWriting = true | |
// return | |
// } | |
//}() | |
} | |
func (b *BadgerTest) Stop() { | |
b.db.Close() | |
logrus.Infof("%s shut down badger test", time.Now()) | |
time.Sleep(1 * time.Second) | |
} | |
func (b *BadgerTest) filecounts(dir string) { | |
ticker := time.NewTicker(60 * time.Second) | |
for _ = range ticker.C { | |
logFiles := int64(0) | |
sstFiles := int64(0) | |
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { | |
if filepath.Ext(path) == ".sst" { | |
sstFiles++ | |
} | |
if filepath.Ext(path) == ".vlog" { | |
logFiles++ | |
} | |
return nil | |
}) | |
logrus.Infof("%s updated gauges vlog=%d sst=%d", time.Now(), logFiles, sstFiles) | |
} | |
} | |
func (b *BadgerTest) write(a *y.Closer) { | |
defer a.Done() | |
data := `{"randomstring":"6446D58D6DFAFD58586D3EA85A53F4A6B3CC057F933A22BB58E188A74AC8F663","refID":12495494,"testfield1234":"foo bar baz","date":"2018-01-01 12:00:00"}` | |
batchSize := 1 | |
rows := []writable{} | |
var cnt uint64 | |
for { | |
select { | |
case <-a.HasBeenClosed(): | |
logrus.Infof("%s stopping writing, let it catch up", time.Now()) | |
return | |
default: | |
} | |
// cnt = (cnt + 1) % 5 | |
cnt++ | |
// ts := time.Now().UnixNano() | |
buf := make([]byte, 8) | |
// binary.BigEndian.PutUint64(buf[0:], uint64(ts)) | |
binary.BigEndian.PutUint64(buf, cnt) | |
// fmt.Printf("Key: %x\n", buf) | |
if !firstKeyStored { | |
firstKey = buf | |
firstKeyStored = true | |
logrus.Infof("%s firstkey stored %x", time.Now(), firstKey) | |
} | |
w := writable{key: buf, value: []byte(data)} | |
rows = append(rows, w) | |
if len(rows) > batchSize { | |
b.saveRows(rows) | |
rows = []writable{} | |
} | |
} | |
} | |
func (b *BadgerTest) saveRows(rows []writable) { | |
// ttl := 30 * time.Minute | |
ttl := 90 * time.Second | |
txn := b.db.NewTransaction(true) | |
for _, row := range rows { | |
if err := txn.SetWithTTL(row.key, row.value, ttl); err == badger.ErrTxnTooBig { | |
logrus.Infof("%s TX too big, committing...", time.Now()) | |
_ = txn.Commit(nil) | |
txn = b.db.NewTransaction(true) | |
err = txn.SetWithTTL(row.key, row.value, ttl) | |
} else if err != nil && err != badger.ErrTxnTooBig { | |
logrus.Errorf("unable to set key-val in transaction; %s", err) | |
} | |
} | |
err := txn.Commit(nil) | |
if err != nil { | |
logrus.Errorf("unable to commit transaction; %s", err) | |
} | |
} | |
func (b *BadgerTest) readKey() { | |
// at some point our first key should be expired | |
err := b.db.View(func(txn *badger.Txn) error { | |
item, err := txn.Get([]byte(firstKey)) | |
if err != nil { | |
return err | |
} | |
val, err := item.Value() | |
if err != nil { | |
return err | |
} | |
logrus.Infof("%s FIRSTKEY VALUE=%s", time.Now(), val) | |
return nil | |
}) | |
if err != nil { | |
logrus.Errorf("%s FIRSTKEY unable to read first key from badger; %s", time.Now(), err) | |
} | |
} | |
func (b *BadgerTest) badgerGC(a *y.Closer) { | |
defer a.Done() | |
_, lastVlogSize := b.db.Size() | |
logrus.Infof("lastVLOGSize=%v", lastVlogSize) | |
ticker := time.NewTicker(2 * time.Minute) | |
const GB = int64(1 << 30) | |
for { | |
select { | |
case <-a.HasBeenClosed(): | |
fmt.Println("Returning from GC") | |
return | |
case <-ticker.C: | |
_, currentVlogSize := b.db.Size() | |
if currentVlogSize < lastVlogSize+GB { | |
continue | |
} | |
logrus.Infof("%s CLEANUP starting to purge keys", time.Now()) | |
err := b.db.PurgeOlderVersions() | |
if err != nil { | |
logrus.Errorf("%s badgerOps unable to purge older versions; %s", time.Now(), err) | |
} | |
logrus.Infof("%s CLEANUP PurgeOlderVersions completed", time.Now()) | |
// If size increased by 3.5 GB, then we run this 3 times. | |
numTimes := (currentVlogSize - lastVlogSize) / GB | |
for i := 0; i < int(numTimes); i++ { | |
err := b.db.RunValueLogGC(0.5) | |
if err != nil { | |
logrus.Errorf("%s badgerOps unable to RunValueLogGC; %s", time.Now(), err) | |
} | |
logrus.Infof("%s CLEANUP RunValueLogGC completed iteration=%d", time.Now(), i) | |
} | |
_, lastVlogSize = b.db.Size() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment