Skip to content

Instantly share code, notes, and snippets.

@tbg
Created June 1, 2015 19:39
Show Gist options
  • Save tbg/bb752a691a196104f344 to your computer and use it in GitHub Desktop.
Save tbg/bb752a691a196104f344 to your computer and use it in GitHub Desktop.
@@ -120,6 +125,7 @@ func (t *rpcTransport) Stop(id proto.RaftNodeID) {
// need a feedback mechanism for that. Potentially easiest is to arrange for
// the next call to Send() to fail appropriately.
func (t *rpcTransport) processQueue(raftNodeID proto.RaftNodeID) {
+ trackIndex := uint64(0)
t.mu.Lock()
ch, ok := t.queues[raftNodeID]
t.mu.Unlock()
@@ -162,6 +168,9 @@ func (t *rpcTransport) processQueue(raftNodeID proto.RaftNodeID) {
case <-t.rpcContext.Stopper.ShouldStop():
return
case <-time.After(raftIdleTimeout):
+ if log.V(1) {
+ log.Infof("closing Raft transport to %d due to inactivity", nodeID)
+ }
return
case <-client.Closed:
log.Warningf("raft client for node %d closed", nodeID)
@@ -191,6 +200,20 @@ func (t *rpcTransport) processQueue(raftNodeID proto.RaftNodeID) {
return
}
client.Go(raftMessageName, protoReq, protoResp, done)
+ if req.Message.Type == raftpb.MsgApp {
+ if trackIndex == 0 {
+ trackIndex = req.Message.Index
+ }
+ log.Warningf("TOBIAS send %v %v: %d entries", req.Message.To, req.Message.Index,
+ len(req.Message.Entries))
+ for _, entry := range req.Message.Entries {
+ if entry.Index != trackIndex+1 && entry.Index > 100 {
+ log.Fatalf("out of order for node %d: last %d, now %d:\n%s", nodeID, trackIndex, entry.Index, raft.DescribeMessage(req.Message, nil))
+ }
+ trackIndex = entry.Index
+ }
+ }
+ // <-done
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment