Skip to content

Instantly share code, notes, and snippets.

@tanayabh

tanayabh/3c.go Secret

Created March 7, 2023 16:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tanayabh/fe1283c93c5083a533d493244d8d0488 to your computer and use it in GitHub Desktop.
Save tanayabh/fe1283c93c5083a533d493244d8d0488 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/json"
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
"log"
"reflect"
"sync"
)
type job struct {
Src string
Dest string
Value any
}
func consumeJobChannel(queue chan job, n *maelstrom.Node) {
log.Printf("Started job queue")
for {
j, _ := <-queue
go func(jb job) {
body := map[string]any{}
body["type"] = "broadcast"
body["message"] = jb.Value
message, err := n.SyncRPC(context.Background(), jb.Dest, body)
if err != nil {
log.Printf("Retry %v after err %v\n", jb, err.Error())
queue <- jb
return
}
resBody := map[string]any{}
if err := json.Unmarshal(message.Body, &resBody); err != nil {
log.Printf("Retry %v after err %v\n", jb, err.Error())
queue <- jb
return
}
log.Printf("saw %v and got %v", jb, resBody)
typ := reflect.ValueOf(resBody["type"]).String()
if typ != "broadcast_ok" {
log.Printf("Retry %v after return type %v\n", jb, typ)
queue <- jb
return
}
}(j)
}
}
func main() {
n := maelstrom.NewNode()
mu := &sync.Mutex{}
values := make(map[any]bool)
currentNeighbours := make(map[string]bool)
topology := make(map[string]interface{})
queue := make(chan job)
go consumeJobChannel(queue, n)
n.Handle("broadcast", func(msg maelstrom.Message) error {
// Unmarshal the message body as an loosely-typed map.
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
message := reflect.ValueOf(body["message"]).Float()
seen := false
mu.Lock()
_, ok := values[message]
if ok {
seen = true
} else {
values[message] = true
}
mu.Unlock()
if seen == false {
//for neighbour, _ := range currentNeighbours {
// if neighbour != msg.Src {
// queue <- job{
// Src: n.ID(),
// Dest: neighbour,
// Value: message,
// }
// }
//}
for _, node := range n.NodeIDs() {
if node != n.ID() && node != msg.Src {
queue <- job{
Src: n.ID(),
Dest: node,
Value: message,
}
}
}
}
return n.Reply(msg, map[string]any{"type": "broadcast_ok"})
})
n.Handle("read", func(msg maelstrom.Message) error {
// Unmarshal the message body as an loosely-typed map.
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
body = map[string]any{}
body["type"] = "read_ok"
var keys []any
mu.Lock()
for k := range values {
keys = append(keys, k)
}
mu.Unlock()
body["messages"] = keys
return n.Reply(msg, body)
})
n.Handle("topology", func(msg maelstrom.Message) error {
// Unmarshal the message body as an loosely-typed map.
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
topology = body["topology"].(map[string]interface{})
currentNeighbours = make(map[string]bool)
for node, neighbours := range topology {
if node == n.ID() {
for _, v := range neighbours.([]interface{}) {
neighbour := v.(string)
currentNeighbours[neighbour] = true
}
}
}
log.Printf("neignbours: %v", currentNeighbours)
body = map[string]any{}
body["type"] = "topology_ok"
return n.Reply(msg, body)
})
if err := n.Run(); err != nil {
log.Fatal(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment