Skip to content

Instantly share code, notes, and snippets.

@adamcfraser
Created October 21, 2019 20:25
Show Gist options
  • Save adamcfraser/448293166095daaa2222f493485d3d7b to your computer and use it in GitHub Desktop.
Save adamcfraser/448293166095daaa2222f493485d3d7b to your computer and use it in GitHub Desktop.
package main
import (
"encoding/json"
"fmt"
"io"
"log"
"strings"
"testing"
"time"
"github.com/couchbase/cbgt"
"github.com/couchbase/gocb"
"github.com/stretchr/testify/assert"
"gopkg.in/couchbaselabs/gocbconnstr.v1"
)
func TestCBGTClosePIndex(t *testing.T) {
server := "localhost"
bucketname := "test_data_bucket"
username := "test_data_bucket"
password := "password"
// serverURL in gocb format for gocbFeed
connSpec, err := gocbconnstr.Parse(fmt.Sprintf("couchbase://%s", server))
assert.NoError(t, err)
serverURL := connSpec.String()
cluster, bucket, err := GetBucketForTest(serverURL, bucketname, username, password)
assert.NoError(t, err)
// Remove the previous config, if it exists (allows rerunning the test without removing the previous node def from cfg)
cfgKey := "_sync:cfg"
var cfgVal []byte
cas, err := bucket.Get(cfgKey, cfgVal)
if err == nil {
log.Printf("Removing existing bucket cfg")
_, removeErr := bucket.Remove(cfgKey, cas)
assert.NoError(t, removeErr)
}
uuid := cbgt.NewUUID()
// Set up CFG
cfgOptions := map[string]interface{}{
"keyPrefix": "_sync:",
}
urls := []string{fmt.Sprintf("http://%s:%s@%s:8091", username, password, server)} // url in go-couchbase format for CfgCB
cfgCB, err := cbgt.NewCfgCBEx(
strings.Join(urls, ";"),
bucketname,
cfgOptions,
)
assert.NoError(t, err)
// Manager settings
tags := []string{"feed", "janitor", "pindex", "planner"}
weight := 1
container := ""
extras := ""
// bindHttp: Used for REST binding (not needed by Sync Gateway), but also as a unique identifier in some spots
bindHttp := uuid
dataDir := ""
// eventHandlers: SG doesn't currently do any processing on manager events:
// - OnRegisterPIndex
// - OnUnregisterPIndex
// - OnFeedError
var eventHandlers cbgt.ManagerEventHandlers
// Specify one feed per pindex
options := make(map[string]string)
options[cbgt.FeedAllotmentOption] = cbgt.FeedAllotmentOnePerPIndex
options["managerLoadDataDir"] = "false"
// Creates a new cbgt manager.
mgr := cbgt.NewManagerEx(
cbgt.VERSION, // cbgt metadata version
cfgCB,
uuid,
tags,
container,
weight,
extras,
bindHttp,
dataDir,
serverURL,
eventHandlers,
options)
err = mgr.Start(cbgt.NODE_DEFS_WANTED)
assert.NoError(t, err)
// Register index type
RegisterPindexImplForTest(bucketname)
// Create PIndex
sourceType := "gocb"
sourceParams, err := CbgtFeedParamsForTest(username, password)
assert.NoError(t, err)
planParams := cbgt.PlanParams{
MaxPartitionsPerPIndex: 16, // num vbuckets per Pindex. Multiple Pindexes could be assigned per node.
NumReplicas: 0, // No replicas required for SG sharded feed
}
// TODO: If this isn't well-formed JSON, cbgt emits errors when opening locally persisted pindex files. Review
// how this can be optimized if we're not actually using it in the indexImpl
indexParams := `{"name": "` + bucketname + `"}`
indexName := bucketname + "_import"
_, previousIndexUUID, err := GetCBGTIndexUUIDForTest(mgr, indexName)
indexType := "syncGateway-import-" + bucketname
err = mgr.CreateIndex(
sourceType, // sourceType
bucketname, // sourceName
bucket.UUID(), // sourceUUID
sourceParams, // sourceParams
indexType, // indexType
indexName, // indexName
indexParams, // indexParams
planParams, // planParams
previousIndexUUID, // prevIndexUUID
)
assert.NoError(t, err)
mgr.Kick("NewIndexesCreated")
// Write something to the bucket to make sure the Dest is working
var value []byte
cas, err = bucket.Get("testKey", &value)
_, err = bucket.Replace("testKey", []byte(`{"test":123}`), cas, 0)
assert.NoError(t, err)
// wait for dest processing before attempting to close PIndex
time.Sleep(1 * time.Second)
// Attempt to close Pindexes
_, pindexes := mgr.CurrentMaps()
for _, pindex := range pindexes {
mgr.ClosePIndex(pindex)
}
bucket.Close()
cluster.Close()
}
func GetBucketForTest(connStr, bucketName, username, password string) (*gocb.Cluster, *gocb.Bucket, error) {
cluster, _ := gocb.Connect(connStr)
authErr := cluster.Authenticate(gocb.PasswordAuthenticator{
Username: username,
Password: password,
})
if authErr != nil {
return nil, nil, authErr
}
bucket, err := cluster.OpenBucket(bucketName, "")
if err != nil {
return nil, nil, err
}
return cluster, bucket, nil
}
func CbgtFeedParamsForTest(username, password string) (string, error) {
feedParams := cbgt.NewDCPFeedParams()
// check for basic auth
feedParams.AuthUser = username
feedParams.AuthPassword = password
feedParams.IncludeXAttrs = true
paramBytes, err := json.Marshal(feedParams)
if err != nil {
return "", err
}
return string(paramBytes), nil
}
func GetCBGTIndexUUIDForTest(manager *cbgt.Manager, indexName string) (exists bool, previousUUID string, err error) {
_, indexDefsMap, err := manager.GetIndexDefs(true)
if err != nil {
return false, "", err
}
indexDef, ok := indexDefsMap[indexName]
if ok {
return true, indexDef.UUID, nil
} else {
return false, "", nil
}
}
func RegisterPindexImplForTest(bucketname string) {
// Since RegisterPIndexImplType is a global var, index type needs to be database-scoped to support
// running multiple databases. This avoids requiring a database lookup based in indexParams at PIndex creation
// time, which introduces deadlock potential
pIndexType := "syncGateway-import-" + bucketname
cbgt.RegisterPIndexImplType(pIndexType,
&cbgt.PIndexImplType{
New: NewTestPIndexImpl,
Open: OpenTestPIndexImpl,
OpenUsing: OpenUsingTestPIndexImpl,
Description: "general/syncGateway-import " +
" - import processing for shared bucket access",
})
}
func NewTestPIndexImpl(indexType, indexParams, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {
log.Printf("NewTestPIndexImpl")
return nil, NewDestTest(), nil
}
func OpenTestPIndexImpl(indexType, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {
log.Printf("OpenTestPIndexImpl")
return nil, NewDestTest(), nil
}
func OpenUsingTestPIndexImpl(indexType, path, indexParams string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {
return OpenTestPIndexImpl(indexType, path, restart)
}
func NewDestTest() *DestTest {
return &DestTest{
meta: make(map[string][]byte),
}
}
type DestTest struct {
meta map[string][]byte
}
func (d *DestTest) Close() error {
log.Printf("DestTest Close")
return nil
}
func (d *DestTest) DataUpdate(partition string, key []byte, seq uint64,
val []byte, cas uint64, extrasType cbgt.DestExtrasType, extras []byte) error {
log.Printf("DestTest DataUpdate %s", key)
return nil
}
func (d *DestTest) DataUpdateEx(partition string, key []byte, seq uint64, val []byte,
cas uint64, extrasType cbgt.DestExtrasType, req interface{}) error {
log.Printf("DestTest DataUpdateEx %s", key)
return nil
}
func (d *DestTest) DataDelete(partition string, key []byte, seq uint64,
cas uint64,
extrasType cbgt.DestExtrasType, extras []byte) error {
log.Printf("DestTest DataDelete %s", key)
return nil
}
func (d *DestTest) DataDeleteEx(partition string, key []byte, seq uint64,
cas uint64, extrasType cbgt.DestExtrasType, req interface{}) error {
log.Printf("DestTest DataDeleteEx %s", key)
return nil
}
func (d *DestTest) SnapshotStart(partition string,
snapStart, snapEnd uint64) error {
log.Printf("DestTest SnapshotStart")
return nil
}
func (d *DestTest) OpaqueGet(partition string) (value []byte, lastSeq uint64, err error) {
return d.meta[partition], 0, nil
}
func (d *DestTest) OpaqueSet(partition string, value []byte) error {
d.meta[partition] = value
return nil
}
func (d *DestTest) Rollback(partition string, rollbackSeq uint64) error {
log.Printf("DestTest Rollback %v %v", partition, rollbackSeq)
return nil
}
func (d *DestTest) RollbackEx(partition string, vbucketUUID uint64, rollbackSeq uint64) error {
log.Printf("DestTest RollbackEx %v %v", partition, rollbackSeq)
return nil
}
func (d *DestTest) ConsistencyWait(partition, partitionUUID string,
consistencyLevel string, consistencySeq uint64, cancelCh <-chan bool) error {
log.Printf("DestTest ConsistencyWait %v", partition)
return nil
}
func (d *DestTest) Count(pindex *cbgt.PIndex, cancelCh <-chan bool) (uint64, error) {
log.Printf("DestTest Count %v", pindex.Name)
return 0, nil
}
func (d *DestTest) Query(pindex *cbgt.PIndex, req []byte, w io.Writer,
cancelCh <-chan bool) error {
log.Printf("DestTest Query %v", pindex.Name)
return nil
}
func (d *DestTest) Stats(io.Writer) error {
log.Printf("DestTest Stats")
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment