Skip to content

Instantly share code, notes, and snippets.

@abhinavdangeti
Last active December 17, 2019 23:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abhinavdangeti/cb080931b0f70059e567d68bf5ce17da to your computer and use it in GitHub Desktop.
Save abhinavdangeti/cb080931b0f70059e567d68bf5ce17da to your computer and use it in GitHub Desktop.
Authentication failing for select_bucket with gocbcore.v8
// Steps to run:
// Spin off a couchbase cluster (1 node: DATA + SEARCH), create travel-sample, adjust host, port info in agent.go
// 1. export CBAUTH_REVRPC_URL=http://Administrator:asdasd@localhost:9000/cbft
// 2. go run agent.go
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"github.com/couchbase/cbauth"
"github.com/couchbase/cbgt"
"github.com/couchbase/gocbcore"
//"gopkg.in/couchbase/gocbcore.v7"
)
func main() {
serverIn := "http://127.0.0.1:9000"
sourceName := "travel-sample"
server, _, bucketName := cbgt.CouchbaseParseSourceName(serverIn, "default", sourceName)
fmt.Println("SERVER:", server, "BUCKET:", bucketName)
config := &gocbcore.AgentConfig{
BucketName: bucketName,
ConnectTimeout: 6000 * time.Millisecond,
Auth: &Authenticator{},
}
err := config.FromConnStr(server)
fmt.Println("1. FROMCONNSTR(SERVER) err:", err)
if err != nil {
return
}
agent, err := gocbcore.CreateAgent(config)
fmt.Println("2. CREATEAGENT err:", err)
if err != nil {
fmt.Println(err)
return
}
defer func() {
agent.Close()
}()
ftsEndPoints := agent.FtsEps()
fmt.Println("3. FTS ENDPOINTS:", ftsEndPoints)
defs, err := retrieveIndexDefs(agent, ftsEndPoints[0])
fmt.Printf("4. INDEXDEFS %#v\n", defs)
if err != nil {
fmt.Println("\tINDEXDEFS FAILED", err)
return
}
bleveMaxResultWindow := fetchBleveMaxResultWindow(agent, ftsEndPoints[0])
fmt.Println("5. BLEVEMAXRESULTWINDOW:", bleveMaxResultWindow)
stats, err := fetchStats(agent)
fmt.Println("6. STATS:", stats)
if err != nil {
fmt.Println("\tSTATS FAILED", err)
return
}
fmt.Println("7. DONE")
}
type Authenticator struct{}
func (a *Authenticator) Credentials(req gocbcore.AuthCredsRequest) (
[]gocbcore.UserPassPair, error) {
endpoint := req.Endpoint
// get rid of the http:// or https:// prefix from the endpoint
endpoint = strings.TrimPrefix(strings.TrimPrefix(
endpoint, "http://"), "https://")
username, password, err := cbauth.GetMemcachedServiceAuth(endpoint)
fmt.Println("\tAUTHENTICATOR", username, password, err)
if err != nil {
return []gocbcore.UserPassPair{{}}, err
}
return []gocbcore.UserPassPair{{
Username: username,
Password: password,
}}, nil
}
func retrieveIndexDefs(agent *gocbcore.Agent, node string) (*cbgt.IndexDefs, error) {
httpClient := agent.HttpClient()
if httpClient == nil {
return nil, fmt.Errorf("retrieveIndexDefs, client not available")
}
fmt.Printf("\tHTTPCLIENT: %#v\n", httpClient)
urlStr, err := cbgt.CBAuthURL(node + "/api/cfg")
if err != nil {
return nil, err
}
resp, err := httpClient.Get(urlStr)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("retrieveIndexDefs, resp status code: %v", resp.StatusCode)
}
bodyBuf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var body struct {
IndexDefs *cbgt.IndexDefs `json:"indexDefs"`
Status string `json:"status"`
}
err = json.Unmarshal(bodyBuf, &body)
if err != nil {
return nil, err
}
if body.Status != "ok" {
return nil, fmt.Errorf("retrieveIndexDefs status error,"+
" body: %+v", body)
}
return body.IndexDefs, nil
}
func fetchBleveMaxResultWindow(agent *gocbcore.Agent, node string) string {
httpClient := agent.HttpClient()
if httpClient == nil {
return ""
}
urlStr, err := cbgt.CBAuthURL(node + "/api/manager")
if err != nil {
return ""
}
resp, err := httpClient.Get(urlStr)
if err != nil {
return ""
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return ""
}
bodyBuf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return ""
}
var expect map[string]interface{}
err = json.Unmarshal(bodyBuf, &expect)
if err != nil {
return ""
}
if val, exists := expect["status"]; !exists || val.(string) != "ok" {
return ""
}
if m, exists := expect["mgr"]; exists {
mgr := m.(map[string]interface{})
options := mgr["options"].(map[string]interface{})
return options["bleveMaxResultWindow"].(string)
}
return ""
}
func fetchStats(agent *gocbcore.Agent) (map[string]cbgt.UUIDSeq, error) {
rv := map[string]cbgt.UUIDSeq{}
signal := make(chan error, 1)
op, err := agent.StatsEx(gocbcore.StatsOptions{Key: "vbucket-details"},
func(resp *gocbcore.StatsResult, er error) {
if resp == nil || er != nil {
signal <- er
return
}
stats := resp.Servers
for _, nodeStats := range stats {
if nodeStats.Error != nil || len(nodeStats.Stats) <= 0 {
continue
}
for _, vbid := range []string{"0", "1", "2", "3", "4", "5", "6", "7"} {
stateVal, ok := nodeStats.Stats["vb_"+vbid]
if !ok || stateVal != "active" {
continue
}
uuid, ok := nodeStats.Stats["vb_"+vbid+":uuid"]
if !ok {
continue
}
seqStr, ok := nodeStats.Stats["vb_"+vbid+":high_seqno"]
if !ok {
continue
}
seq, err := strconv.ParseUint(seqStr, 10, 64)
if err == nil {
rv[vbid] = cbgt.UUIDSeq{
UUID: uuid,
Seq: seq,
}
}
}
}
signal <- nil
})
if err != nil {
return nil, err
}
timeoutTmr := gocbcore.AcquireTimer(time.Duration(10 * time.Second))
select {
case err := <-signal:
gocbcore.ReleaseTimer(timeoutTmr, false)
return rv, err
case <-timeoutTmr.C:
gocbcore.ReleaseTimer(timeoutTmr, true)
if !op.Cancel() {
err := <-signal
return rv, err
}
return nil, gocbcore.ErrTimeout
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment