Skip to content

Instantly share code, notes, and snippets.

@masroorhasan
Created February 28, 2020 18:35
Show Gist options
  • Save masroorhasan/53db16adda94e7f2209afc80de98d93a to your computer and use it in GitHub Desktop.
Save masroorhasan/53db16adda94e7f2209afc80de98d93a to your computer and use it in GitHub Desktop.
package main
import (
"context"
"flag"
"fmt"
"log"
"sync"
"time"
"github.com/coreos/etcd/clientv3/clientv3util"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
)
var (
key = flag.String("key", "", "etcd key")
)
func doTxn(ctx context.Context, c *clientv3.Client, key, val string) (bool, error) {
// insert on key MISS
tr, err := c.Txn(ctx).
If(clientv3util.KeyMissing(key)).
Then(
clientv3.OpPut(key, val),
clientv3.OpGet(key, clientv3.WithLastRev()...),
).Else(clientv3.OpGet(key, clientv3.WithLastRev()...)).
Commit()
if err != nil {
log.Fatalf("failed to PUT txn on miss: %v", err)
return false, err
}
if tr.Succeeded {
log.Printf("MISS: %v", tr.Responses[0])
return true, nil
}
// update on key HIT
newV := fmt.Sprintf("%s,%s", val, tr.Responses[0].GetResponseRange().Kvs[0].Value)
tr, err = c.Txn(ctx).
If(
clientv3.Compare(clientv3.ModRevision(key), "=", tr.Responses[0].GetResponseRange().Kvs[0].ModRevision),
).
Then(
clientv3.OpPut(key, newV),
clientv3.OpGet(key, clientv3.WithLastRev()...),
).
Commit()
if err != nil {
log.Fatalf("failed to PUT txn on hit: %v", err)
return false, err
}
return tr.Succeeded, nil
}
func doSTM(ctx context.Context, c *clientv3.Client, key, val string) (bool, error) {
stm := func(stm concurrency.STM) error {
oval := stm.Get(key)
if oval == "" {
stm.Put(key, val)
return nil
}
newV := fmt.Sprintf("%s,%s", val, oval)
stm.Put(key, newV)
return nil
}
tr, err := concurrency.NewSTM(c, stm)
if err != nil {
return false, err
}
return tr.Succeeded, nil
}
func main() {
ctx := context.Background()
c, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 10 * time.Second,
})
if err != nil {
log.Fatalf("failed to create etcd client: %v", err)
}
flag.Parse()
if *key == "" {
log.Fatalf("must specify key")
}
// do updates
wg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
val := fmt.Sprintf("%d", id)
fmt.Printf("put, key=%s, val=%s\n", *key, val)
ok, err := doTxn(ctx, c, *key, val)
if err != nil {
log.Fatalf("shit is fucked yo")
return
}
if ok {
log.Printf("successful put for key=%s val=%s\n", *key, val)
} else {
log.Print("y u no update")
}
}(i)
}
wg.Wait()
log.Print("done")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment