Skip to content

Instantly share code, notes, and snippets.

@manishrjain
Created May 1, 2018 03:34
Show Gist options
  • Save manishrjain/137b9849f2672a5fc3a106815d9d41d7 to your computer and use it in GitHub Desktop.
Save manishrjain/137b9849f2672a5fc3a106815d9d41d7 to your computer and use it in GitHub Desktop.
Test Badger with TTL
package main
import (
"encoding/binary"
"encoding/json"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"path/filepath"
"time"
"github.com/Sirupsen/logrus"
"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/y"
)
type writable struct {
key []byte
value []byte
}
var (
// store the first key we've seen so we can try and query it to see when it's expired from view
firstKey []byte
firstKeyStored bool //
stopWriting = false
)
func main() {
go func() {
// you can hit:
// go tool pprof http://localhost:8001/debug/pprof/heap
// go tool pprof http://localhost:8001/debug/pprof/profile
logrus.Infof("starting debug web server....")
logrus.Info(http.ListenAndServe("localhost:8001", nil))
}()
done := make(chan struct{})
bt := NewBadgerTest()
bt.Start()
<-done // wait
}
type BadgerTest struct {
db *badger.DB
}
func NewBadgerTest() *BadgerTest {
dir := "/home/mrjn/badgertest"
opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
opts.ValueLogLoadingMode = options.FileIO
opts.TableLoadingMode = options.FileIO
// opts.NumCompactors = 20
// opts.MaxTableSize = .25 * 1073741824 // .25GB
opts.NumMemtables = 2
opts.ValueLogFileSize = 1 * 1073741824 // 2GB
opts.SyncWrites = false
bytes, _ := json.Marshal(&opts)
logrus.Infof("BADGER OPTIONS=%s", string(bytes))
db, err := badger.Open(opts)
if err != nil {
panic(fmt.Sprintf("unable to open badger db; %s", err))
}
bt := &BadgerTest{
db: db,
}
go bt.filecounts(dir)
return bt
}
func (b *BadgerTest) Start() {
a := y.NewCloser(3)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for range c {
fmt.Println("Signaling...")
a.SignalAndWait()
fmt.Printf("Closing, error: %v\n", b.db.Close())
os.Exit(0)
}
}()
workers := 1
for i := 0; i < workers; i++ {
go b.write(a)
}
//go b.write()
go b.badgerGC(a)
go func() {
defer a.Done()
tick := time.NewTicker(1 * time.Minute)
for {
select {
case <-tick.C:
b.readKey()
case <-a.HasBeenClosed():
fmt.Println("Returning from readKey")
return
}
}
}()
//go func() {
// tick := time.NewTicker(15 * time.Minute)
// for _ = range tick.C {
// stopWriting = true
// return
// }
//}()
}
func (b *BadgerTest) Stop() {
b.db.Close()
logrus.Infof("%s shut down badger test", time.Now())
time.Sleep(1 * time.Second)
}
func (b *BadgerTest) filecounts(dir string) {
ticker := time.NewTicker(60 * time.Second)
for _ = range ticker.C {
logFiles := int64(0)
sstFiles := int64(0)
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if filepath.Ext(path) == ".sst" {
sstFiles++
}
if filepath.Ext(path) == ".vlog" {
logFiles++
}
return nil
})
logrus.Infof("%s updated gauges vlog=%d sst=%d", time.Now(), logFiles, sstFiles)
}
}
func (b *BadgerTest) write(a *y.Closer) {
defer a.Done()
data := `{"randomstring":"6446D58D6DFAFD58586D3EA85A53F4A6B3CC057F933A22BB58E188A74AC8F663","refID":12495494,"testfield1234":"foo bar baz","date":"2018-01-01 12:00:00"}`
batchSize := 1
rows := []writable{}
var cnt uint64
for {
select {
case <-a.HasBeenClosed():
logrus.Infof("%s stopping writing, let it catch up", time.Now())
return
default:
}
// cnt = (cnt + 1) % 5
cnt++
// ts := time.Now().UnixNano()
buf := make([]byte, 8)
// binary.BigEndian.PutUint64(buf[0:], uint64(ts))
binary.BigEndian.PutUint64(buf, cnt)
// fmt.Printf("Key: %x\n", buf)
if !firstKeyStored {
firstKey = buf
firstKeyStored = true
logrus.Infof("%s firstkey stored %x", time.Now(), firstKey)
}
w := writable{key: buf, value: []byte(data)}
rows = append(rows, w)
if len(rows) > batchSize {
b.saveRows(rows)
rows = []writable{}
}
}
}
func (b *BadgerTest) saveRows(rows []writable) {
// ttl := 30 * time.Minute
ttl := 90 * time.Second
txn := b.db.NewTransaction(true)
for _, row := range rows {
if err := txn.SetWithTTL(row.key, row.value, ttl); err == badger.ErrTxnTooBig {
logrus.Infof("%s TX too big, committing...", time.Now())
_ = txn.Commit(nil)
txn = b.db.NewTransaction(true)
err = txn.SetWithTTL(row.key, row.value, ttl)
} else if err != nil && err != badger.ErrTxnTooBig {
logrus.Errorf("unable to set key-val in transaction; %s", err)
}
}
err := txn.Commit(nil)
if err != nil {
logrus.Errorf("unable to commit transaction; %s", err)
}
}
func (b *BadgerTest) readKey() {
// at some point our first key should be expired
err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(firstKey))
if err != nil {
return err
}
val, err := item.Value()
if err != nil {
return err
}
logrus.Infof("%s FIRSTKEY VALUE=%s", time.Now(), val)
return nil
})
if err != nil {
logrus.Errorf("%s FIRSTKEY unable to read first key from badger; %s", time.Now(), err)
}
}
func (b *BadgerTest) badgerGC(a *y.Closer) {
defer a.Done()
_, lastVlogSize := b.db.Size()
logrus.Infof("lastVLOGSize=%v", lastVlogSize)
ticker := time.NewTicker(2 * time.Minute)
const GB = int64(1 << 30)
for {
select {
case <-a.HasBeenClosed():
fmt.Println("Returning from GC")
return
case <-ticker.C:
_, currentVlogSize := b.db.Size()
if currentVlogSize < lastVlogSize+GB {
continue
}
logrus.Infof("%s CLEANUP starting to purge keys", time.Now())
err := b.db.PurgeOlderVersions()
if err != nil {
logrus.Errorf("%s badgerOps unable to purge older versions; %s", time.Now(), err)
}
logrus.Infof("%s CLEANUP PurgeOlderVersions completed", time.Now())
// If size increased by 3.5 GB, then we run this 3 times.
numTimes := (currentVlogSize - lastVlogSize) / GB
for i := 0; i < int(numTimes); i++ {
err := b.db.RunValueLogGC(0.5)
if err != nil {
logrus.Errorf("%s badgerOps unable to RunValueLogGC; %s", time.Now(), err)
}
logrus.Infof("%s CLEANUP RunValueLogGC completed iteration=%d", time.Now(), i)
}
_, lastVlogSize = b.db.Size()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment