-
-
Save tanayabh/fe1283c93c5083a533d493244d8d0488 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
maelstrom "github.com/jepsen-io/maelstrom/demo/go" | |
"log" | |
"reflect" | |
"sync" | |
) | |
type job struct { | |
Src string | |
Dest string | |
Value any | |
} | |
func consumeJobChannel(queue chan job, n *maelstrom.Node) { | |
log.Printf("Started job queue") | |
for { | |
j, _ := <-queue | |
go func(jb job) { | |
body := map[string]any{} | |
body["type"] = "broadcast" | |
body["message"] = jb.Value | |
message, err := n.SyncRPC(context.Background(), jb.Dest, body) | |
if err != nil { | |
log.Printf("Retry %v after err %v\n", jb, err.Error()) | |
queue <- jb | |
return | |
} | |
resBody := map[string]any{} | |
if err := json.Unmarshal(message.Body, &resBody); err != nil { | |
log.Printf("Retry %v after err %v\n", jb, err.Error()) | |
queue <- jb | |
return | |
} | |
log.Printf("saw %v and got %v", jb, resBody) | |
typ := reflect.ValueOf(resBody["type"]).String() | |
if typ != "broadcast_ok" { | |
log.Printf("Retry %v after return type %v\n", jb, typ) | |
queue <- jb | |
return | |
} | |
}(j) | |
} | |
} | |
func main() { | |
n := maelstrom.NewNode() | |
mu := &sync.Mutex{} | |
values := make(map[any]bool) | |
currentNeighbours := make(map[string]bool) | |
topology := make(map[string]interface{}) | |
queue := make(chan job) | |
go consumeJobChannel(queue, n) | |
n.Handle("broadcast", func(msg maelstrom.Message) error { | |
// Unmarshal the message body as an loosely-typed map. | |
var body map[string]any | |
if err := json.Unmarshal(msg.Body, &body); err != nil { | |
return err | |
} | |
message := reflect.ValueOf(body["message"]).Float() | |
seen := false | |
mu.Lock() | |
_, ok := values[message] | |
if ok { | |
seen = true | |
} else { | |
values[message] = true | |
} | |
mu.Unlock() | |
if seen == false { | |
//for neighbour, _ := range currentNeighbours { | |
// if neighbour != msg.Src { | |
// queue <- job{ | |
// Src: n.ID(), | |
// Dest: neighbour, | |
// Value: message, | |
// } | |
// } | |
//} | |
for _, node := range n.NodeIDs() { | |
if node != n.ID() && node != msg.Src { | |
queue <- job{ | |
Src: n.ID(), | |
Dest: node, | |
Value: message, | |
} | |
} | |
} | |
} | |
return n.Reply(msg, map[string]any{"type": "broadcast_ok"}) | |
}) | |
n.Handle("read", func(msg maelstrom.Message) error { | |
// Unmarshal the message body as an loosely-typed map. | |
var body map[string]any | |
if err := json.Unmarshal(msg.Body, &body); err != nil { | |
return err | |
} | |
body = map[string]any{} | |
body["type"] = "read_ok" | |
var keys []any | |
mu.Lock() | |
for k := range values { | |
keys = append(keys, k) | |
} | |
mu.Unlock() | |
body["messages"] = keys | |
return n.Reply(msg, body) | |
}) | |
n.Handle("topology", func(msg maelstrom.Message) error { | |
// Unmarshal the message body as an loosely-typed map. | |
var body map[string]any | |
if err := json.Unmarshal(msg.Body, &body); err != nil { | |
return err | |
} | |
topology = body["topology"].(map[string]interface{}) | |
currentNeighbours = make(map[string]bool) | |
for node, neighbours := range topology { | |
if node == n.ID() { | |
for _, v := range neighbours.([]interface{}) { | |
neighbour := v.(string) | |
currentNeighbours[neighbour] = true | |
} | |
} | |
} | |
log.Printf("neignbours: %v", currentNeighbours) | |
body = map[string]any{} | |
body["type"] = "topology_ok" | |
return n.Reply(msg, body) | |
}) | |
if err := n.Run(); err != nil { | |
log.Fatal(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment