Skip to content

Instantly share code, notes, and snippets.

@melekes melekes/badger_db.go
Last active Jan 30, 2019

Embed
What would you like to do?
Alternative databases, which were used to benchmark Tendermint indexing
package db
import (
"bufio"
"io"
"os"
"sync"
"github.com/dgraph-io/badger"
)
func init() {
registerDBCreator(BadgerDBBackend, badgerDBCreator, true)
}
type Options badger.Options
func badgerDBCreator(dbName, dir string) (DB, error) {
return NewBadgerDB(dbName, dir)
}
var (
_KB = int64(1024)
_MB = 1024 * _KB
_GB = 1024 * _MB
)
// NewBadgerDB creates a Badger key-value store backed to the
// directory dir supplied. If dir does not exist, we create it.
func NewBadgerDB(dbName, dir string) (*BadgerDB, error) {
// BadgerDB doesn't expose a way for us to
// create a DB with the user's supplied name.
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
opts := badger.DefaultOptions
// // Arbitrary size given that at Tendermint
// // we'll need huge KeyValue stores.
opts.ValueLogFileSize = 1 * _GB
// opts.SyncWrites = false
opts.Dir = dir
opts.ValueDir = dir
return NewBadgerDBWithOptions(opts)
}
// NewBadgerDBWithOptions creates a BadgerDB key value store
// gives the flexibility of initializing a database with the
// respective options.
func NewBadgerDBWithOptions(opts badger.Options) (*BadgerDB, error) {
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
return &BadgerDB{db: db}, nil
}
type BadgerDB struct {
db *badger.DB
}
var _ DB = (*BadgerDB)(nil)
func (b *BadgerDB) Get(key []byte) []byte {
var val []byte
err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(key)
if err != nil {
return err
}
val, err = item.Value()
if err != nil {
return err
}
return nil
})
if err != nil {
// Unfortunate that Get can't return errors
// TODO: Propose allowing DB's Get to return errors too.
panic(err)
}
// var valueSave []byte
// err := valueItem.Value(func(origValue []byte) error {
// // TODO: Decide if we should just assign valueSave to origValue
// // since here we aren't dealing with iterators directly.
// valueSave = make([]byte, len(origValue))
// copy(valueSave, origValue)
// return nil
// })
// if err != nil {
// // TODO: ditto:: Propose allowing DB's Get to return errors too.
// panic(err)
// }
return val
}
func (b *BadgerDB) Has(key []byte) bool {
var found bool
err := b.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
if err != nil && err != badger.ErrKeyNotFound {
return err
}
found = (err != badger.ErrKeyNotFound)
return nil
})
if err != nil {
// Unfortunate that Get can't return errors
// TODO: Propose allowing DB's Get to return errors too.
panic(err)
}
return found
}
func (b *BadgerDB) Set(key, value []byte) {
err := b.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, value)
})
if err != nil {
panic(err)
}
}
func (b *BadgerDB) SetSync(key, value []byte) {
err := b.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, value)
})
if err != nil {
panic(err)
}
}
func (b *BadgerDB) Delete(key []byte) {
err := b.db.Update(func(txn *badger.Txn) error {
return txn.Delete(key)
})
if err != nil {
panic(err)
}
}
func (b *BadgerDB) DeleteSync(key []byte) {
err := b.db.Update(func(txn *badger.Txn) error {
return txn.Delete(key)
})
if err != nil {
panic(err)
}
}
func (b *BadgerDB) Close() {
if err := b.db.Close(); err != nil {
panic(err)
}
}
func (b *BadgerDB) Fprint(w io.Writer) {
// bIter := b.Iterator()
// defer bIter.Release()
// var bw *bufio.Writer
// if bbw, ok := w.(*bufio.Writer); ok {
// bw = bbw
// } else {
// bw = bufio.NewWriter(w)
// }
// defer bw.Flush()
// i := uint64(0)
// for bIter.rewind(); bIter.valid(); bIter.Next() {
// k, v := bIter.kv()
// fmt.Fprintf(bw, "[%X]:\t[%X]\n", k, v)
// i += 1
// if i%1024 == 0 {
// bw.Flush()
// i = 0
// }
// }
}
func (b *BadgerDB) Print() {
bw := bufio.NewWriter(os.Stdout)
b.Fprint(bw)
}
func (b *BadgerDB) Iterator(start, end []byte) Iterator {
// dbIter := b.db.NewIterator(badger.IteratorOptions{
// PrefetchValues: true,
// // Arbitrary PrefetchSize
// PrefetchSize: 10,
// })
// // Ensure that we are always at the zeroth item
// dbIter.Rewind()
return nil
}
func (b *BadgerDB) ReverseIterator(start, end []byte) Iterator {
return nil
}
func (b *BadgerDB) IteratorPrefix(prefix []byte) Iterator {
return b.Iterator(prefix, nil)
}
func (b *BadgerDB) Stats() map[string]string {
return nil
}
func (b *BadgerDB) NewBatch() Batch {
return &badgerDBBatch{db: b}
}
var _ Batch = (*badgerDBBatch)(nil)
type badgerDBBatch struct {
entriesMu sync.Mutex
entries []*badger.Entry
db *BadgerDB
}
func (bb *badgerDBBatch) Set(key, value []byte) {
bb.entriesMu.Lock()
bb.entries = append(bb.entries, &badger.Entry{
Key: key,
Value: value,
})
bb.entriesMu.Unlock()
}
// Unfortunately Badger doesn't have a batch delete
// The closest that we can do is do a delete from the DB.
// Hesitant to do DeleteAsync because that changes the
// expected ordering
func (bb *badgerDBBatch) Delete(key []byte) {
// bb.db.Delete(key)
}
// Write commits all batch sets to the DB
func (bb *badgerDBBatch) Write() {
bb.entriesMu.Lock()
entries := bb.entries
bb.entries = nil
bb.entriesMu.Unlock()
if len(entries) == 0 {
return
}
err := bb.db.db.Update(func(txn *badger.Txn) error {
for _, e := range entries {
if err := txn.SetEntry(e); err != nil {
return err
}
}
return nil
})
if err != nil {
panic(err)
}
// var buf *bytes.Buffer // It'll be lazily allocated when needed
// for i, entry := range entries {
// if err := entry.Error; err != nil {
// if buf == nil {
// buf = new(bytes.Buffer)
// }
// fmt.Fprintf(buf, "#%d: entry err: %v\n", i, err)
// }
// }
// if buf != nil {
// panic(string(buf.Bytes()))
// }
}
func (bb *badgerDBBatch) WriteSync() {
bb.entriesMu.Lock()
entries := bb.entries
bb.entries = nil
bb.entriesMu.Unlock()
if len(entries) == 0 {
return
}
err := bb.db.db.Update(func(txn *badger.Txn) error {
for _, e := range entries {
if err := txn.SetEntry(e); err != nil {
return err
}
}
return nil
})
if err != nil {
panic(err)
}
// var buf *bytes.Buffer // It'll be lazily allocated when needed
// for i, entry := range entries {
// if err := entry.Error; err != nil {
// if buf == nil {
// buf = new(bytes.Buffer)
// }
// fmt.Fprintf(buf, "#%d: entry err: %v\n", i, err)
// }
// }
// if buf != nil {
// panic(string(buf.Bytes()))
// }
}
type badgerDBIterator struct {
mu sync.RWMutex
iter *badger.Iterator
}
package db
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"sync"
bolt "go.etcd.io/bbolt"
)
var (
bucket = []byte("all")
)
func init() {
registerDBCreator(BBoltDBBackend, boltDBCreator, true)
}
func boltDBCreator(dbName, dir string) (DB, error) {
return NewBoltDB(dbName, dir)
}
// NewBoltDB creates a bbolt key-value store backed to the
// directory dir supplied. If dir does not exist, we create it.
func NewBoltDB(dbName, dir string) (*boltDB, error) {
// boltDB doesn't expose a way for us to
// create a DB with the user's supplied name.
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
return NewBoltDBWithOptions(filepath.Join(dir, dbName), 0755, nil)
}
// NewBoltDBWithOptions creates a bbolt key value store
// gives the flexibility of initializing a database with the
// respective options.
func NewBoltDBWithOptions(path string, perm os.FileMode, opts *bolt.Options) (*boltDB, error) {
db, err := bolt.Open(path, perm, opts)
if err != nil {
return nil, err
}
// db.NoSync = true
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucket(bucket)
if err != nil {
return fmt.Errorf("create bucket: %s", err)
}
return nil
})
return &boltDB{db: db}, nil
}
type boltDB struct {
db *bolt.DB
}
var _ DB = (*boltDB)(nil)
func (b *boltDB) Get(key []byte) []byte {
var val []byte
err := b.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
val = b.Get(key)
return nil
})
if err != nil {
// Unfortunate that Get can't return errors
// TODO: Propose allowing DB's Get to return errors too.
panic(err)
}
// var valueSave []byte
// err := valueItem.Value(func(origValue []byte) error {
// // TODO: Decide if we should just assign valueSave to origValue
// // since here we aren't dealing with iterators directly.
// valueSave = make([]byte, len(origValue))
// copy(valueSave, origValue)
// return nil
// })
// if err != nil {
// // TODO: ditto:: Propose allowing DB's Get to return errors too.
// panic(err)
// }
return val
}
func (b *boltDB) Has(key []byte) bool {
var found bool
err := b.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
if b.Get(key) != nil {
found = true
}
return nil
})
if err != nil {
// Unfortunate that Get can't return errors
// TODO: Propose allowing DB's Get to return errors too.
panic(err)
}
return found
}
func (b *boltDB) Set(key, value []byte) {
err := b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.Put(key, value)
})
if err != nil {
panic(err)
}
}
func (b *boltDB) SetSync(key, value []byte) {
err := b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.Put(key, value)
})
if err != nil {
panic(err)
}
}
func (b *boltDB) Delete(key []byte) {
err := b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.Delete(key)
})
if err != nil {
panic(err)
}
}
func (b *boltDB) DeleteSync(key []byte) {
err := b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.Delete(key)
})
if err != nil {
panic(err)
}
}
func (b *boltDB) Close() {
if err := b.db.Close(); err != nil {
panic(err)
}
}
func (b *boltDB) Fprint(w io.Writer) {
// bIter := b.Iterator()
// defer bIter.Release()
// var bw *bufio.Writer
// if bbw, ok := w.(*bufio.Writer); ok {
// bw = bbw
// } else {
// bw = bufio.NewWriter(w)
// }
// defer bw.Flush()
// i := uint64(0)
// for bIter.rewind(); bIter.valid(); bIter.Next() {
// k, v := bIter.kv()
// fmt.Fprintf(bw, "[%X]:\t[%X]\n", k, v)
// i += 1
// if i%1024 == 0 {
// bw.Flush()
// i = 0
// }
// }
}
func (b *boltDB) Print() {
bw := bufio.NewWriter(os.Stdout)
b.Fprint(bw)
}
func (b *boltDB) Iterator(start, end []byte) Iterator {
// dbIter := b.db.NewIterator(bolt.IteratorOptions{
// PrefetchValues: true,
// // Arbitrary PrefetchSize
// PrefetchSize: 10,
// })
// // Ensure that we are always at the zeroth item
// dbIter.Rewind()
return nil
}
func (b *boltDB) ReverseIterator(start, end []byte) Iterator {
return nil
}
func (b *boltDB) IteratorPrefix(prefix []byte) Iterator {
return b.Iterator(prefix, nil)
}
func (b *boltDB) Stats() map[string]string {
return nil
}
func (b *boltDB) NewBatch() Batch {
return &boltDBBatch{db: b}
}
var _ Batch = (*boltDBBatch)(nil)
type entry struct {
key []byte
value []byte
}
type boltDBBatch struct {
entriesMu sync.Mutex
entries []*entry
db *boltDB
}
func (bb *boltDBBatch) Set(key, value []byte) {
bb.entriesMu.Lock()
bb.entries = append(bb.entries, &entry{
key: key,
value: value,
})
bb.entriesMu.Unlock()
}
// Unfortunately bolt doesn't have a batch delete
// The closest that we can do is do a delete from the DB.
// Hesitant to do DeleteAsync because that changes the
// expected ordering
func (bb *boltDBBatch) Delete(key []byte) {
// bb.db.Delete(key)
}
// Write commits all batch sets to the DB
func (bb *boltDBBatch) Write() {
bb.entriesMu.Lock()
entries := bb.entries
bb.entries = nil
bb.entriesMu.Unlock()
if len(entries) == 0 {
return
}
err := bb.db.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
for _, e := range entries {
if err := b.Put(e.key, e.value); err != nil {
return err
}
}
return nil
})
if err != nil {
panic(err)
}
// var buf *bytes.Buffer // It'll be lazily allocated when needed
// for i, entry := range entries {
// if err := entry.Error; err != nil {
// if buf == nil {
// buf = new(bytes.Buffer)
// }
// fmt.Fprintf(buf, "#%d: entry err: %v\n", i, err)
// }
// }
// if buf != nil {
// panic(string(buf.Bytes()))
// }
}
func (bb *boltDBBatch) WriteSync() {
bb.entriesMu.Lock()
entries := bb.entries
bb.entries = nil
bb.entriesMu.Unlock()
if len(entries) == 0 {
return
}
err := bb.db.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
for _, e := range entries {
if err := b.Put(e.key, e.value); err != nil {
return err
}
}
return nil
})
if err != nil {
panic(err)
}
// var buf *bytes.Buffer // It'll be lazily allocated when needed
// for i, entry := range entries {
// if err := entry.Error; err != nil {
// if buf == nil {
// buf = new(bytes.Buffer)
// }
// fmt.Fprintf(buf, "#%d: entry err: %v\n", i, err)
// }
// }
// if buf != nil {
// panic(string(buf.Bytes()))
// }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.