Skip to content

Instantly share code, notes, and snippets.

@lestrrat
Created September 9, 2016 11:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lestrrat/86f7f804d21d9065be8a2333db996639 to your computer and use it in GitHub Desktop.
Save lestrrat/86f7f804d21d9065be8a2333db996639 to your computer and use it in GitHub Desktop.
func TestParallelBatch(t *testing.T) {
dir, err := ioutil.TempDir("", "leveldb-test-")
if !assert.NoError(t, err, "ioutil.TempDir should succeed") {
return
}
defer os.RemoveAll(dir)
db, err := leveldb.OpenFile(dir, nil)
if !assert.NoError(t, err, "leveldb.OpenFile should succeed") {
return
}
defer db.Close()
var b leveldb.Batch
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
n := strconv.Itoa(i)
go func() {
defer wg.Done()
key := []byte("key" + n)
value := []byte("value" + n)
b.Put(key, value)
}()
}
wg.Wait()
if !assert.NoError(t, db.Write(&b, nil), "db.Write should succeed") {
return
}
for i := 0; i < 10; i++ {
n := strconv.Itoa(i)
value, err := db.Get([]byte("key" + n), nil)
if !assert.NoError(t, err, "db.Get should succeed") {
return
}
if !assert.Equal(t, []byte("value" + n), value, "value should match") {
return
}
}
}
diff --git a/leveldb/batch.go b/leveldb/batch.go
index 5010067..8c2f6da 100644
--- a/leveldb/batch.go
+++ b/leveldb/batch.go
@@ -9,6 +9,7 @@ package leveldb
import (
"encoding/binary"
"fmt"
+ "sync"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
@@ -41,12 +42,14 @@ type BatchReplay interface {
// Batch is a write batch.
type Batch struct {
+ dataMutex sync.Mutex
data []byte
rLen, bLen int
seq uint64
sync bool
}
+// XXX touches b.data
func (b *Batch) grow(n int) {
off := len(b.data)
if off == 0 {
@@ -71,6 +74,9 @@ func (b *Batch) grow(n int) {
}
func (b *Batch) appendRec(kt keyType, key, value []byte) {
+ b.dataMutex.Lock()
+ defer b.dataMutex.Unlock()
+
n := 1 + binary.MaxVarintLen32 + len(key)
if kt == keyTypeVal {
n += binary.MaxVarintLen32 + len(value)
@@ -179,6 +185,9 @@ func (b *Batch) encode() []byte {
}
func (b *Batch) decode(prevSeq uint64, data []byte) error {
+ b.dataMutex.Lock()
+ defer b.dataMutex.Unlock()
+
if len(data) < batchHdrLen {
return newErrBatchCorrupted("too short")
}
@@ -199,6 +208,9 @@ func (b *Batch) decode(prevSeq uint64, data []byte) error {
...skipping...
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
@@ -41,12 +42,14 @@ type BatchReplay interface {
// Batch is a write batch.
type Batch struct {
+ dataMutex sync.Mutex
data []byte
rLen, bLen int
seq uint64
sync bool
}
+// XXX touches b.data
func (b *Batch) grow(n int) {
off := len(b.data)
if off == 0 {
@@ -71,6 +74,9 @@ func (b *Batch) grow(n int) {
}
func (b *Batch) appendRec(kt keyType, key, value []byte) {
+ b.dataMutex.Lock()
+ defer b.dataMutex.Unlock()
+
n := 1 + binary.MaxVarintLen32 + len(key)
if kt == keyTypeVal {
n += binary.MaxVarintLen32 + len(value)
@@ -179,6 +185,9 @@ func (b *Batch) encode() []byte {
}
func (b *Batch) decode(prevSeq uint64, data []byte) error {
+ b.dataMutex.Lock()
+ defer b.dataMutex.Unlock()
+
if len(data) < batchHdrLen {
return newErrBatchCorrupted("too short")
}
@@ -199,6 +208,9 @@ func (b *Batch) decode(prevSeq uint64, data []byte) error {
}
func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error {
+ b.dataMutex.Lock()
+ defer b.dataMutex.Unlock()
+
off := batchHdrLen
for i := 0; i < b.rLen; i++ {
if off >= len(b.data) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment