Skip to content

Instantly share code, notes, and snippets.

@adamcfraser
Created November 12, 2015 09:03
Show Gist options
  • Save adamcfraser/b174f9f543d2ca541dff to your computer and use it in GitHub Desktop.
Save adamcfraser/b174f9f543d2ca541dff to your computer and use it in GitHub Desktop.
Test to reproduce bulk op issue
package gocb
import (
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// TestTimeoutHandling
// 1. Writes 1M docs to a bucket
// 2. In addition goroutine that loops, doing a simple single get operation
// 3. Starts multiple goroutines (maxGoroutines) to execute bulk get calls (each call gets bulkGetSize). If one of these goroutines gets
// an error in response to the bulk get call, it terminates.
//
// Observed results:
// i. If maxGoroutines is low (<25), this runs without error at bulkGetSize=150, and runs the reads at about 70K ops/second on a local couchbase server
// ii. If maxGoroutines is a bit higher (50), this fails in the following way:
// - some of the bulk get goroutines get a timeout error, and terminate (this part is expected behaviour - would be the trigger for the client to reduce load)
// - the remaining bulk get ops hang, and never return
// - the single-get goroutine starts returning timeout errors
// - ops on the couchbase bucket drop to zero
func TestTimeoutHandling(t *testing.T) {
maxGoroutines := 50
bulkGetSize := 150
server := "http://localhost:8091"
cluster, err := Connect(server)
if err != nil {
t.Errorf("Could not connect to server")
}
log.Println("Connected to server...")
bucket, err := cluster.OpenBucket("test_bucket", "")
if err != nil {
t.Errorf("Could not open bucket")
}
log.Println("Opened bucket...")
someData := []byte("012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789")
log.Println("Writing docs...")
// write 1M docs to the bucket
var wg sync.WaitGroup
for i := 0; i <= 1000; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
key := fmt.Sprintf("key_%d_%d", i, j)
_, err := bucket.Upsert(key, someData, uint32(0))
if err != nil {
t.Errorf("Upsert Error:%v", err)
}
}
}(i)
}
wg.Wait()
log.Println("Write complete. Starting readers...")
var activeReaders int32
var failedReaders int32
var bulkGetIssued int32
var bulkGetSuccesses int32
var simpleGetIssued int32
var simpleGetSuccesses int32
var simpleGetQueueOverflow int32
var simpleGetTimedOut int32
// Start goroutine that just does single reads
go func() {
for {
var result []byte
atomic.AddInt32(&simpleGetIssued, 1)
_, err := bucket.Get("key_0_0", &result)
if err != nil {
if strings.Contains(err.Error(), "Queue overflow") {
atomic.AddInt32(&simpleGetQueueOverflow, 1)
} else if strings.Contains(err.Error(), "timed out") {
atomic.AddInt32(&simpleGetTimedOut, 1)
}
} else {
atomic.AddInt32(&simpleGetSuccesses, 1)
}
}
}()
// Start goroutines that for bulk gets
var getWg sync.WaitGroup
for i := 0; i < maxGoroutines; i++ {
getWg.Add(1)
atomic.AddInt32(&activeReaders, 1)
go func(i int) {
defer func() {
atomic.AddInt32(&activeReaders, -1)
getWg.Done()
}()
for {
// build []BulkOp for bulk get
var items []BulkOp
for j := 0; j < bulkGetSize; j++ {
var value []byte
key := fmt.Sprintf("key_%d_%d", i, j)
item := &GetOp{Key: key, Value: &value}
items = append(items, item)
}
// Issue bulk get
atomic.AddInt32(&bulkGetIssued, 1)
err := bucket.Do(items)
if err != nil {
log.Printf("Bulk get error:%v", err)
atomic.AddInt32(&failedReaders, 1)
break
} else {
atomic.AddInt32(&bulkGetSuccesses, 1)
}
}
}(i)
}
// active readers tracking:
go func() {
for {
log.Printf("Readers (Active/Failed): (%d/%d)", atomic.LoadInt32(&activeReaders), atomic.LoadInt32(&failedReaders))
log.Printf("Bulk Get calls (Issued/Succeeded): (%d/%d)", atomic.LoadInt32(&bulkGetIssued), atomic.LoadInt32(&bulkGetSuccesses))
log.Printf("Simple Get calls (Issued/Succeeded/Overflows/Timeouts): (%d/%d/%d/%d)", atomic.LoadInt32(&simpleGetIssued), atomic.LoadInt32(&simpleGetSuccesses), atomic.LoadInt32(&simpleGetQueueOverflow), atomic.LoadInt32(&simpleGetTimedOut))
time.Sleep(2 * time.Second)
}
}()
getWg.Wait()
log.Println("All readers failed")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment