Skip to content

Instantly share code, notes, and snippets.

@alonmuroch
Created August 26, 2021 12:57
Show Gist options
  • Save alonmuroch/b622c7c4b057f1d772ff5806c8d1bd66 to your computer and use it in GitHub Desktop.
Save alonmuroch/b622c7c4b057f1d772ff5806c8d1bd66 to your computer and use it in GitHub Desktop.
// ProcessMessage pulls messages from the queue to be processed sequentially
func (i *Instance) ProcessMessage() (processedMsg bool, err error) {
if netMsg := i.MsgQueue.PopMessage(msgqueue.IBFTMessageIndexKey(i.State.Lambda.Get(), i.State.SeqNumber.Get(), i.State.Round.Get())); netMsg != nil {
var pp pipeline.Pipeline
switch netMsg.SignedMessage.Message.Type {
case proto.RoundState_PrePrepare:
pp = i.prePrepareMsgPipeline()
case proto.RoundState_Prepare:
pp = i.prepareMsgPipeline()
case proto.RoundState_Commit:
pp = i.commitMsgPipeline()
case proto.RoundState_ChangeRound:
pp = i.changeRoundFullQuorumMsgPipeline()
default:
i.Logger.Warn("undefined message type", zap.Any("msg", netMsg.SignedMessage))
return true, nil
}
if err := pp.Run(netMsg.SignedMessage); err != nil {
return true, err
}
return true, nil
}
return false, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment