Skip to content

Instantly share code, notes, and snippets.

@ahmetb
Last active March 1, 2023 21:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahmetb/1872db1f14e74aa3e7ca6484673e9443 to your computer and use it in GitHub Desktop.
Save ahmetb/1872db1f14e74aa3e7ca6484673e9443 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"sync"
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
)
type emptyMsg struct {
Type string `json:"type"`
}
type readResp struct {
Type string `json:"type"`
Value int `json:"value"`
}
type addReq struct {
Type string `json:"type"`
Value int `json:"value"`
}
type versionedVal struct {
Generation int `json:"generation"`
Value int `json:"value"`
}
func main() {
const key = "key"
node := maelstrom.NewNode()
n.Handle("read", func(msg maelstrom.Message) error {
seqConsistentKV := maelstrom.NewSeqKV(node)
v, err := seqConsistentKV.Read(context.TODO(), key)
if err != nil {
if isKeyNotExists(err) {
return n.Reply(msg, readResp{Type: "read_ok", Value: 0})
}
log.Println(err)
return err
}
rv := v.(versionedVal)
return n.Reply(msg, readResp{Type: "read_ok", Value: rv.Value})
})
n.Handle("add", func(msg maelstrom.Message) error {
seqConsistentKV := maelstrom.NewSeqKV(node)
var req addReq
if err := json.Unmarshal(msg.Body, &req); err != nil {
return err
}
ctx := context.Background()
for {
v, err := seqConsistentKV.Read(ctx, key)
if err != nil {
if !isKeyNotExists(err) {
return fmt.Errorf("read err: %w", err)
}
if err := seqConsistentKV.Write(ctx, key, versionedVal{Generation: 1, Value: 0}); err != nil {
return fmt.Errorf("initial write err: %w", err)
}
continue // empty key started, try again
}
readVal := v.(versionedVal)
err = seqConsistentKV.CompareAndSwap(ctx, key,
readVal,
versionedVal{
Generation: readVal.Generation + 1,
Value: readVal.Value + req.Value}, false)
if err == nil {
return n.Reply(msg, emptyMsg{Type: "add_ok"})
}
var kvErr *maelstrom.RPCError
if errors.As(err, &kvErr) && kvErr.Code == maelstrom.PreconditionFailed {
continue // write contention, try again
} else {
log.Println("cas err", err)
return fmt.Errorf("cas err: %w", err)
}
}
})
if err := n.Run(); err != nil {
log.Fatal(err)
}
}
func isKeyNotExists(err error) bool {
var rpcErr *maelstrom.RPCError
if !errors.As(err, &rpcErr) {
return false
}
return rpcErr.Code == maelstrom.KeyDoesNotExist
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment