Skip to content

Instantly share code, notes, and snippets.

@campoy
Created June 21, 2019 19:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save campoy/a494cb382aeb487063440f99fde55e09 to your computer and use it in GitHub Desktop.
Save campoy/a494cb382aeb487063440f99fde55e09 to your computer and use it in GitHub Desktop.
Showing how to batch your writes automatically with WriteBatch
package main
import (
"log"
"math/rand"
"sync"
"time"
"github.com/dgraph-io/badger/v2"
)
func main() {
rand.Seed(time.Now().Unix())
db, err := newBatchedDB("./data", time.Second)
if err != nil {
log.Fatal(err)
}
defer db.Close()
for i := 0; i < 1000; i++ {
log.Printf("sending write")
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if err := db.Set(randBytes(), randBytes()); err != nil {
log.Fatalf("could not set: %v", err)
}
}
}
func randBytes() []byte {
b := make([]byte, 32)
_, err := rand.Read(b)
if err != nil {
panic("unexpected error while reading from rand")
}
return b
}
type batchedDB struct {
*badger.DB
wb *badger.WriteBatch
mu sync.RWMutex
quit chan bool
done chan bool
}
func newBatchedDB(path string, d time.Duration) (*batchedDB, error) {
opts := badger.DefaultOptions
opts.Dir = path
opts.ValueDir = path
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
bdb := &batchedDB{
DB: db,
wb: db.NewWriteBatch(),
quit: make(chan bool),
done: make(chan bool),
}
go bdb.loop(d)
return bdb, nil
}
func (db *batchedDB) loop(d time.Duration) {
tick := time.Tick(d)
for {
select {
case <-tick:
db.flush(false)
case <-db.quit:
db.flush(true)
close(db.done)
return
}
}
}
func (db *batchedDB) flush(last bool) {
db.mu.Lock()
defer db.mu.Unlock()
log.Printf("flushing writes to badger")
if err := db.wb.Flush(); err != nil {
// Not the best error handling, but you could have some kind of callback too.
log.Printf("could not flush: %v", err)
}
if last {
db.wb = nil
} else {
db.wb = db.NewWriteBatch()
}
}
func (db *batchedDB) Close() error {
close(db.quit)
<-db.done
return db.DB.Close()
}
func (db *batchedDB) Set(k, v []byte) error {
db.mu.RLock()
defer db.mu.RUnlock()
return db.wb.Set(k, v)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment