Skip to content

Instantly share code, notes, and snippets.

@tamird
Created September 6, 2016 14:39
Show Gist options
  • Save tamird/52f9d9515db67d9cd7aa0522b980d612 to your computer and use it in GitHub Desktop.
Save tamird/52f9d9515db67d9cd7aa0522b980d612 to your computer and use it in GitHub Desktop.
commit 1288923786d3dc39ebf18b214b579002185b5476
Author: Tamir Duberstein <tamird@gmail.com>
Date: Fri Sep 2 12:23:06 2016 -0400
storage: define streaming snapshots in terms of shared code
Reduces duplication between normal raft processing and streaming
snapshot application.
With @jordanlewis.
diff --git a/storage/replica.go b/storage/replica.go
index 593e008..7c60ff5 100644
--- a/storage/replica.go
+++ b/storage/replica.go
@@ -1647,7 +1647,7 @@ func (r *Replica) isRaftLeaderLocked() bool {
// handleRaftReady processes the Ready() messages on the replica if there are any. It takes a
// non-nil IncomingSnapshot pointer to indicate that it is about to process a snapshot.
-func (r *Replica) handleRaftReady(inSnap *IncomingSnapshot) error {
+func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error {
ctx := r.ctx
var hasReady bool
var rd raft.Ready
@@ -1697,11 +1697,11 @@ func (r *Replica) handleRaftReady(inSnap *IncomingSnapshot) error {
if err != nil {
return errors.Wrap(err, "invalid snapshot id")
}
- if inSnap == nil || *snapUUID != inSnap.SnapUUID {
+ if *snapUUID != inSnap.SnapUUID {
log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath")
}
- if err := r.applySnapshot(ctx, *inSnap, rd.Snapshot, rd.HardState); err != nil {
+ if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState); err != nil {
return err
}
diff --git a/storage/replica_raftstorage.go b/storage/replica_raftstorage.go
index e90e529..39f9e55 100644
--- a/storage/replica_raftstorage.go
+++ b/storage/replica_raftstorage.go
@@ -426,6 +426,10 @@ type IncomingSnapshot struct {
Batches [][]byte
// The Raft log entries for this snapshot.
LogEntries [][]byte
+
+ // If true, indicates that a placeholder for this snapshot needs to be
+ // removed once it is applied.
+ addedPlaceholder bool
}
// Close this OutgoingSnapshot, freeing associated resources.
diff --git a/storage/store.go b/storage/store.go
index 856ac2f..218f06e 100644
--- a/storage/store.go
+++ b/storage/store.go
@@ -2196,38 +2196,49 @@ func checkReplicaTooOld(r *Replica, fromReplicaID roachpb.ReplicaID) *roachpb.Er
// HandleSnapshot reads an incoming streaming snapshot and applies it if
// possible.
-func (s *Store) HandleSnapshot(header *SnapshotRequest_Header, stream MultiRaft_RaftSnapshotServer,
+func (s *Store) HandleSnapshot(
+ header *SnapshotRequest_Header,
+ stream MultiRaft_RaftSnapshotServer,
) error {
+
+ sendSnapError := func(err error) error {
+ return stream.Send(&SnapshotResponse{
+ Status: SnapshotResponse_ERROR,
+ Message: err.Error(),
+ })
+ }
+
+ ctx := s.logContext(stream.Context())
+
var addedPlaceholder, appliedSnapshot bool
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
placeholder, err := s.canApplySnapshotLocked(&header.RangeDescriptor)
if err != nil {
- log.Info(stream.Context(),
- errors.Wrapf(err, "%s: cannot apply snapshot on range %d", s, header.RangeDescriptor.RangeID))
- return err
+ return sendSnapError(
+ errors.Wrapf(err, "%s: cannot apply snapshot on range %d", s, header.RangeDescriptor.RangeID),
+ )
}
- addedPlaceholder = placeholder != nil
+ addedPlaceholder = placeholder != nil
if addedPlaceholder {
if err := s.addPlaceholderLocked(placeholder); err != nil {
- log.Fatal(stream.Context(),
- errors.Wrapf(err, "%s: could not add vetted placeholder %s", s, placeholder))
+ log.Fatal(ctx,
+ errors.Wrapf(err, "%s: could not add vetted placeholder %s", s, placeholder),
+ )
}
}
return nil
}(); err != nil {
- return stream.Send(&SnapshotResponse{
- Status: SnapshotResponse_ERROR,
- Message: errors.Wrap(err, "permanently rejected snapshot").Error()})
+ return errors.Wrap(err, "permanently rejected snapshot")
}
defer func() {
if !appliedSnapshot {
s.mu.Lock()
if err := s.removePlaceholderLocked(header.RangeDescriptor.RangeID); err != nil {
- log.Warningf(stream.Context(), "failed to cleanup placeholder: %+v", err)
+ log.Warningf(ctx, "failed to cleanup placeholder: %s", err)
}
s.mu.Unlock()
}
@@ -2245,9 +2256,7 @@ func (s *Store) HandleSnapshot(header *SnapshotRequest_Header, stream MultiRaft_
return err
}
if req.Header != nil {
- return stream.Send(&SnapshotResponse{
- Status: SnapshotResponse_ERROR,
- Message: "client error: provided a header mid-stream"})
+ return sendSnapError(errors.New("client error: provided a header mid-stream"))
}
if req.KVBatch != nil {
@@ -2257,11 +2266,21 @@ func (s *Store) HandleSnapshot(header *SnapshotRequest_Header, stream MultiRaft_
logEntries = append(logEntries, req.LogEntries...)
}
if req.Final {
- if err := s.applySnapshot(
- stream.Context(), header, batches, logEntries, addedPlaceholder); err != nil {
- return stream.Send(&SnapshotResponse{
- Status: SnapshotResponse_ERROR,
- Message: errors.Wrap(err.GoError(), "failed to apply snapshot").Error()})
+ snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data)
+ if err != nil {
+ return sendSnapError(errors.Wrap(err, "invalid snapshot"))
+ }
+
+ inSnap := IncomingSnapshot{
+ SnapUUID: *snapUUID,
+ RangeDescriptor: header.RangeDescriptor,
+ Batches: batches,
+ LogEntries: logEntries,
+ addedPlaceholder: addedPlaceholder,
+ }
+
+ if err := s.handleRaftRequest(ctx, &header.RaftMessageRequest, inSnap); err != nil {
+ return sendSnapError(errors.Wrap(err.GoError(), "failed to apply snapshot"))
}
appliedSnapshot = true
return stream.Send(&SnapshotResponse{Status: SnapshotResponse_APPLIED})
@@ -2269,50 +2288,68 @@ func (s *Store) HandleSnapshot(header *SnapshotRequest_Header, stream MultiRaft_
}
}
-// applySnapshot applies the snapshot given by the SnapshotRequest_Header
-// and the slice of KV BatchReprs and Raft log entries. Passing a true
-// addedPlaceholder indicates that a placeholder for this snapshot needs
-// to be removed once it is applied.
-func (s *Store) applySnapshot(
- ctx context.Context, header *SnapshotRequest_Header, batches [][]byte, logEntries [][]byte,
- addedPlaceholder bool,
-) *roachpb.Error {
+// HandleRaftRequest dispatches a raft message to the appropriate Replica. It
+// requires that s.processRaftMu and s.mu are not held.
+func (s *Store) HandleRaftRequest(ctx context.Context, req *RaftMessageRequest) *roachpb.Error {
+ return s.handleRaftRequest(s.logContext(ctx), req, IncomingSnapshot{})
+}
+
+// handleRaftRequest is the same as HandleRaftRequest except that it also
+// handles buffered streaming snapshots. If req.Message.Type is
+// raftpb.MsgSnap, inSnap must not be empty; in that case, inSnap will be
+// applied. Preemptive and raft-initiated snapshots are supported.
+func (s *Store) handleRaftRequest(ctx context.Context, req *RaftMessageRequest, inSnap IncomingSnapshot) *roachpb.Error {
s.processRaftMu.Lock()
defer s.processRaftMu.Unlock()
+ // Drop messages that come from a node that we believe was once
+ // a member of the group but has been removed.
s.mu.Lock()
- r, ok := s.mu.replicas[header.RangeDescriptor.RangeID]
- s.mu.Unlock()
- rmr := header.RaftMessageRequest
- if ok {
- if err := checkReplicaTooOld(r, rmr.FromReplica.ReplicaID); err != nil {
+ if r, ok := s.mu.replicas[req.RangeID]; ok {
+ if err := checkReplicaTooOld(r, req.FromReplica.ReplicaID); err != nil {
+ s.mu.Unlock()
return err
}
}
- s.mu.Lock()
+ s.metrics.raftRcvdMessages[req.Message.Type].Inc(1)
+
+ switch req.Message.Type {
+ case raftpb.MsgSnap:
+ if (inSnap.SnapUUID == uuid.UUID{}) {
+ log.Fatalf(ctx, "programming error: received %s via the Raft transport", raftpb.MsgSnap)
+ }
+
+ case raftpb.MsgHeartbeat:
+ // TODO(bdarnell): handle coalesced heartbeats.
+ }
+
// Lazily create the replica.
- r, err := s.getOrCreateReplicaLocked(header.RangeDescriptor.RangeID,
- rmr.ToReplica.ReplicaID, rmr.FromReplica)
+ r, err := s.getOrCreateReplicaLocked(req.RangeID, req.ToReplica.ReplicaID, req.FromReplica)
// TODO(bdarnell): is it safe to release the store lock here?
// It deadlocks to hold s.Mutex while calling raftGroup.Step.
s.mu.Unlock()
if err != nil {
return roachpb.NewError(err)
}
- r.setLastReplicaDescriptors(rmr)
- snapUUID, err := uuid.FromBytes(rmr.Message.Snapshot.Data)
- if err != nil {
- return roachpb.NewError(errors.Wrap(err, "invalid snapshot: unparseable uuid"))
- }
- inSnap := IncomingSnapshot{
- SnapUUID: *snapUUID,
- RangeDescriptor: header.RangeDescriptor,
- Batches: batches,
- LogEntries: logEntries,
- }
+ r.setLastReplicaDescriptors(*req)
+
+ if req.ToReplica.ReplicaID == 0 {
+ if req.Message.Type != raftpb.MsgSnap {
+ // We disallow non-snapshot messages to replica ID 0. Note that
+ // getOrCreateReplicaLocked disallows moving the replica ID backward, so
+ // the only way we can get here is if the replica did not previously exist.
+ if log.V(1) {
+ log.Infof(ctx, "refusing incoming Raft message %s for range %d from %+v to %+v",
+ req.Message.Type, req.RangeID, req.FromReplica, req.ToReplica)
+ }
+ return roachpb.NewErrorf(
+ "cannot recreate replica that is not a member of its range (StoreID %s not found in range %d)",
+ r.store.StoreID(),
+ req.RangeID,
+ )
+ }
- if rmr.ToReplica.ReplicaID == 0 {
// Allow snapshots to be applied to replicas before they are
// members of the raft group (i.e. replicas with an ID of 0). This
// is the only operation that can be performed before it is part of
@@ -2322,10 +2359,10 @@ func (s *Store) applySnapshot(
// get all of Raft's internal safety checks (it confuses messages
// at term zero for internal messages). The sending side uses the
// term from the snapshot itself, but we'll just check nonzero.
- if rmr.Message.Term == 0 {
+ if req.Message.Term == 0 {
return roachpb.NewErrorf(
"preemptive snapshot from term %d received with zero term",
- rmr.Message.Snapshot.Metadata.Term,
+ req.Message.Snapshot.Metadata.Term,
)
}
// TODO(tschottdorf): A lot of locking of the individual Replica
@@ -2387,7 +2424,7 @@ func (s *Store) applySnapshot(
return roachpb.NewError(err)
}
// We have a Raft group; feed it the message.
- if err := raftGroup.Step(header.RaftMessageRequest.Message); err != nil {
+ if err := raftGroup.Step(req.Message); err != nil {
return roachpb.NewError(errors.Wrap(err, "unable to process preemptive snapshot"))
}
// In the normal case, the group should ask us to apply a snapshot.
@@ -2409,9 +2446,9 @@ func (s *Store) applySnapshot(
s.mu.Lock()
defer s.mu.Unlock()
- if addedPlaceholder {
+ if inSnap.addedPlaceholder {
// Clear the replica placeholder; we are about to swap it with a real replica.
- if err := s.removePlaceholderLocked(header.RangeDescriptor.RangeID); err != nil {
+ if err := s.removePlaceholderLocked(inSnap.RangeDescriptor.RangeID); err != nil {
return roachpb.NewError(err)
}
}
@@ -2429,75 +2466,20 @@ func (s *Store) applySnapshot(
}
if err := r.withRaftGroup(func(raftGroup *raft.RawNode) error {
- return raftGroup.Step(header.RaftMessageRequest.Message)
+ return raftGroup.Step(req.Message)
}); err != nil {
return roachpb.NewError(err)
}
- // Force the replica to deal with this snapshot right now.
- if err := r.handleRaftReady(&inSnap); err != nil {
- return roachpb.NewError(errors.Wrap(err, "Didn't process snapshot."))
- }
- return nil
-}
-
-// HandleRaftRequest dispatches a raft message to the appropriate Replica. It
-// requires that s.processRaftMu and s.mu are not held.
-func (s *Store) HandleRaftRequest(ctx context.Context, req *RaftMessageRequest) *roachpb.Error {
- s.processRaftMu.Lock()
- defer s.processRaftMu.Unlock()
-
- ctx = s.logContext(ctx)
-
- // Drop messages that come from a node that we believe was once
- // a member of the group but has been removed.
- s.mu.Lock()
- if r, ok := s.mu.replicas[req.RangeID]; ok {
- if err := checkReplicaTooOld(r, req.FromReplica.ReplicaID); err != nil {
- s.mu.Unlock()
- return err
- }
- }
-
- s.metrics.raftRcvdMessages[req.Message.Type].Inc(1)
-
- switch req.Message.Type {
- case raftpb.MsgSnap:
- // All snapshots should be handled by the streaming endpoint.
- log.Fatal(ctx, "programming error: received a snapshot via the Raft endpoint")
-
- case raftpb.MsgHeartbeat:
- // TODO(bdarnell): handle coalesced heartbeats.
- }
-
- // Lazily create the replica.
- r, err := s.getOrCreateReplicaLocked(req.RangeID, req.ToReplica.ReplicaID, req.FromReplica)
- // TODO(bdarnell): is it safe to release the store lock here?
- // It deadlocks to hold s.Mutex while calling raftGroup.Step.
- s.mu.Unlock()
- if err != nil {
- return roachpb.NewError(err)
- }
- r.setLastReplicaDescriptors(*req)
- if req.ToReplica.ReplicaID == 0 {
- // We disallow non-snapshot messages to replica ID 0. Note that
- // getOrCreateReplicaLocked disallows moving the replica ID backward, so
- // the only way we can get here is if the replica did not previously exist.
- if log.V(1) {
- log.Infof(ctx, "refusing incoming Raft message %s for range %d from %+v to %+v",
- req.Message.Type, req.RangeID, req.FromReplica, req.ToReplica)
+ if (inSnap.SnapUUID == uuid.UUID{}) {
+ s.enqueueRaftUpdateCheck(req.RangeID)
+ } else {
+ // Force the replica to deal with this snapshot right now.
+ if err := r.handleRaftReady(inSnap); err != nil {
+ return roachpb.NewError(errors.Wrap(err, "didn't process snapshot"))
}
- return roachpb.NewErrorf("cannot recreate replica that is not a member of its range (StoreID %s not found in range %d)",
- r.store.StoreID(), req.RangeID)
- }
-
- if err := r.withRaftGroup(func(raftGroup *raft.RawNode) error {
- return raftGroup.Step(req.Message)
- }); err != nil {
- return roachpb.NewError(err)
}
- s.enqueueRaftUpdateCheck(req.RangeID)
return nil
}
@@ -2747,7 +2729,7 @@ func (s *Store) processRaft() {
// parallel.
for _, r := range uninitReplicas {
start := timeutil.Now()
- if err := r.handleRaftReady(nil); err != nil {
+ if err := r.handleRaftReady(IncomingSnapshot{}); err != nil {
panic(err) // TODO(bdarnell)
}
maybeWarnDuration(start, r, "handle raft ready")
@@ -2760,7 +2742,7 @@ func (s *Store) processRaft() {
workQueue <- func() {
defer wg.Done()
start := timeutil.Now()
- if err := r.handleRaftReady(nil); err != nil {
+ if err := r.handleRaftReady(IncomingSnapshot{}); err != nil {
panic(err) // TODO(bdarnell)
}
maybeWarnDuration(start, r, "handle raft ready")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment