Skip to content

Instantly share code, notes, and snippets.

@MJDSys MJDSys/riak_key_lost.go
Last active Dec 19, 2015

Embed
What would you like to do?
Sample Riak program that has disappearing keys as of version 1.4
package main
import (
"flag"
"fmt"
riak "github.com/tpjg/goriakpbc"
"os"
"strconv"
"sync"
)
const do_keys = 10000
var quit_early = flag.Bool("quit_early", false, "Quit early if any fetches fail in the fetch stage")
var r_val = flag.Int("r", 2, "R value to use for fetches during the insert phase")
func main() {
flag.Parse()
// Connect + setup bucket
con := riak.NewClientPool("localhost:9000", 100)
err := con.Connect()
if err != nil {
panic(err)
}
bucket, err := con.NewBucket("test_bucket_no_one_has")
if err != nil {
panic(err)
}
err = bucket.SetAllowMult(true)
if err != nil {
panic(err)
}
mapCh := make(chan map[int]bool)
fails := make(chan int)
go func() {
failMap := make(map[int]bool)
for {
select {
case f := <-fails:
failMap[f] = true
case mapCh <- failMap:
break
}
}
}()
// Ok, first insert 10000 items.
wg := sync.WaitGroup{}
wg.Add(do_keys)
for i := 0; i < do_keys; i++ {
go func(i int) {
defer wg.Done()
item, err := bucket.Get(strconv.Itoa(i), map[string]uint32{"r": uint32(*r_val)})
if item == nil {
panic(err)
}
if len(item.Vclock) == 0 {
fails <- i
}
item.Data = []byte("ASDF")
err = item.Store()
if err != nil {
panic(err)
}
}(i)
}
wg.Wait()
fmt.Println("Done insert")
failMap := <-mapCh
fmt.Println("No vclocks in", len(failMap), "items")
dead := false
// Verify items exist
wg.Add(do_keys)
for i := 0; i < do_keys; i++ {
go func(i int) {
defer wg.Done()
item, err := bucket.Get(strconv.Itoa(i))
if err != nil {
fmt.Printf("Failed to fetch item %v err %s (vclock found during insert? %v, or found now? %v)\n", i, err, !failMap[i], len(item.Vclock) != 0)
dead = true
} else if item.Conflict() {
fmt.Printf("Found siblings in item %v (vclock found during insert? %v)\n", i, failMap[i])
}
}(i)
}
wg.Wait()
if dead && *quit_early {
fmt.Println("Quiting early due to failed fetches!")
os.Exit(1)
// return
}
fmt.Println("Done fetch")
// And Delete
keys, err := bucket.ListKeys()
if err != nil {
panic(err)
}
wg.Add(len(keys))
for _, key := range keys {
go func(key string) {
defer wg.Done()
obj, err := bucket.Get(string(key))
if obj == nil {
panic(err)
}
err = obj.Destroy()
if err != nil {
panic(err)
}
}(string(key))
}
wg.Wait()
fmt.Println("Done Delete")
fmt.Println("DONE")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.