Created
September 6, 2016 14:39
-
-
Save tamird/52f9d9515db67d9cd7aa0522b980d612 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 1288923786d3dc39ebf18b214b579002185b5476 | |
Author: Tamir Duberstein <tamird@gmail.com> | |
Date: Fri Sep 2 12:23:06 2016 -0400 | |
storage: define streaming snapshots in terms of shared code | |
Reduces duplication between normal raft processing and streaming | |
snapshot application. | |
With @jordanlewis. | |
diff --git a/storage/replica.go b/storage/replica.go | |
index 593e008..7c60ff5 100644 | |
--- a/storage/replica.go | |
+++ b/storage/replica.go | |
@@ -1647,7 +1647,7 @@ func (r *Replica) isRaftLeaderLocked() bool { | |
// handleRaftReady processes the Ready() messages on the replica if there are any. It takes a | |
// non-nil IncomingSnapshot pointer to indicate that it is about to process a snapshot. | |
-func (r *Replica) handleRaftReady(inSnap *IncomingSnapshot) error { | |
+func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error { | |
ctx := r.ctx | |
var hasReady bool | |
var rd raft.Ready | |
@@ -1697,11 +1697,11 @@ func (r *Replica) handleRaftReady(inSnap *IncomingSnapshot) error { | |
if err != nil { | |
return errors.Wrap(err, "invalid snapshot id") | |
} | |
- if inSnap == nil || *snapUUID != inSnap.SnapUUID { | |
+ if *snapUUID != inSnap.SnapUUID { | |
log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath") | |
} | |
- if err := r.applySnapshot(ctx, *inSnap, rd.Snapshot, rd.HardState); err != nil { | |
+ if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState); err != nil { | |
return err | |
} | |
diff --git a/storage/replica_raftstorage.go b/storage/replica_raftstorage.go | |
index e90e529..39f9e55 100644 | |
--- a/storage/replica_raftstorage.go | |
+++ b/storage/replica_raftstorage.go | |
@@ -426,6 +426,10 @@ type IncomingSnapshot struct { | |
Batches [][]byte | |
// The Raft log entries for this snapshot. | |
LogEntries [][]byte | |
+ | |
+ // If true, indicates that a placeholder for this snapshot needs to be | |
+ // removed once it is applied. | |
+ addedPlaceholder bool | |
} | |
// Close this OutgoingSnapshot, freeing associated resources. | |
diff --git a/storage/store.go b/storage/store.go | |
index 856ac2f..218f06e 100644 | |
--- a/storage/store.go | |
+++ b/storage/store.go | |
@@ -2196,38 +2196,49 @@ func checkReplicaTooOld(r *Replica, fromReplicaID roachpb.ReplicaID) *roachpb.Er | |
// HandleSnapshot reads an incoming streaming snapshot and applies it if | |
// possible. | |
-func (s *Store) HandleSnapshot(header *SnapshotRequest_Header, stream MultiRaft_RaftSnapshotServer, | |
+func (s *Store) HandleSnapshot( | |
+ header *SnapshotRequest_Header, | |
+ stream MultiRaft_RaftSnapshotServer, | |
) error { | |
+ | |
+ sendSnapError := func(err error) error { | |
+ return stream.Send(&SnapshotResponse{ | |
+ Status: SnapshotResponse_ERROR, | |
+ Message: err.Error(), | |
+ }) | |
+ } | |
+ | |
+ ctx := s.logContext(stream.Context()) | |
+ | |
var addedPlaceholder, appliedSnapshot bool | |
if err := func() error { | |
s.mu.Lock() | |
defer s.mu.Unlock() | |
placeholder, err := s.canApplySnapshotLocked(&header.RangeDescriptor) | |
if err != nil { | |
- log.Info(stream.Context(), | |
- errors.Wrapf(err, "%s: cannot apply snapshot on range %d", s, header.RangeDescriptor.RangeID)) | |
- return err | |
+ return sendSnapError( | |
+ errors.Wrapf(err, "%s: cannot apply snapshot on range %d", s, header.RangeDescriptor.RangeID), | |
+ ) | |
} | |
- addedPlaceholder = placeholder != nil | |
+ addedPlaceholder = placeholder != nil | |
if addedPlaceholder { | |
if err := s.addPlaceholderLocked(placeholder); err != nil { | |
- log.Fatal(stream.Context(), | |
- errors.Wrapf(err, "%s: could not add vetted placeholder %s", s, placeholder)) | |
+ log.Fatal(ctx, | |
+ errors.Wrapf(err, "%s: could not add vetted placeholder %s", s, placeholder), | |
+ ) | |
} | |
} | |
return nil | |
}(); err != nil { | |
- return stream.Send(&SnapshotResponse{ | |
- Status: SnapshotResponse_ERROR, | |
- Message: errors.Wrap(err, "permanently rejected snapshot").Error()}) | |
+ return errors.Wrap(err, "permanently rejected snapshot") | |
} | |
defer func() { | |
if !appliedSnapshot { | |
s.mu.Lock() | |
if err := s.removePlaceholderLocked(header.RangeDescriptor.RangeID); err != nil { | |
- log.Warningf(stream.Context(), "failed to cleanup placeholder: %+v", err) | |
+ log.Warningf(ctx, "failed to cleanup placeholder: %s", err) | |
} | |
s.mu.Unlock() | |
} | |
@@ -2245,9 +2256,7 @@ func (s *Store) HandleSnapshot(header *SnapshotRequest_Header, stream MultiRaft_ | |
return err | |
} | |
if req.Header != nil { | |
- return stream.Send(&SnapshotResponse{ | |
- Status: SnapshotResponse_ERROR, | |
- Message: "client error: provided a header mid-stream"}) | |
+ return sendSnapError(errors.New("client error: provided a header mid-stream")) | |
} | |
if req.KVBatch != nil { | |
@@ -2257,11 +2266,21 @@ func (s *Store) HandleSnapshot(header *SnapshotRequest_Header, stream MultiRaft_ | |
logEntries = append(logEntries, req.LogEntries...) | |
} | |
if req.Final { | |
- if err := s.applySnapshot( | |
- stream.Context(), header, batches, logEntries, addedPlaceholder); err != nil { | |
- return stream.Send(&SnapshotResponse{ | |
- Status: SnapshotResponse_ERROR, | |
- Message: errors.Wrap(err.GoError(), "failed to apply snapshot").Error()}) | |
+ snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) | |
+ if err != nil { | |
+ return sendSnapError(errors.Wrap(err, "invalid snapshot")) | |
+ } | |
+ | |
+ inSnap := IncomingSnapshot{ | |
+ SnapUUID: *snapUUID, | |
+ RangeDescriptor: header.RangeDescriptor, | |
+ Batches: batches, | |
+ LogEntries: logEntries, | |
+ addedPlaceholder: addedPlaceholder, | |
+ } | |
+ | |
+ if err := s.handleRaftRequest(ctx, &header.RaftMessageRequest, inSnap); err != nil { | |
+ return sendSnapError(errors.Wrap(err.GoError(), "failed to apply snapshot")) | |
} | |
appliedSnapshot = true | |
return stream.Send(&SnapshotResponse{Status: SnapshotResponse_APPLIED}) | |
@@ -2269,50 +2288,68 @@ func (s *Store) HandleSnapshot(header *SnapshotRequest_Header, stream MultiRaft_ | |
} | |
} | |
-// applySnapshot applies the snapshot given by the SnapshotRequest_Header | |
-// and the slice of KV BatchReprs and Raft log entries. Passing a true | |
-// addedPlaceholder indicates that a placeholder for this snapshot needs | |
-// to be removed once it is applied. | |
-func (s *Store) applySnapshot( | |
- ctx context.Context, header *SnapshotRequest_Header, batches [][]byte, logEntries [][]byte, | |
- addedPlaceholder bool, | |
-) *roachpb.Error { | |
+// HandleRaftRequest dispatches a raft message to the appropriate Replica. It | |
+// requires that s.processRaftMu and s.mu are not held. | |
+func (s *Store) HandleRaftRequest(ctx context.Context, req *RaftMessageRequest) *roachpb.Error { | |
+ return s.handleRaftRequest(s.logContext(ctx), req, IncomingSnapshot{}) | |
+} | |
+ | |
+// handleRaftRequest is the same as HandleRaftRequest except that it also | |
+// handles buffered streaming snapshots. If req.Message.Type is | |
+// raftpb.MsgSnap, inSnap must not be empty; in that case, inSnap will be | |
+// applied. Preemptive and raft-initiated snapshots are supported. | |
+func (s *Store) handleRaftRequest(ctx context.Context, req *RaftMessageRequest, inSnap IncomingSnapshot) *roachpb.Error { | |
s.processRaftMu.Lock() | |
defer s.processRaftMu.Unlock() | |
+ // Drop messages that come from a node that we believe was once | |
+ // a member of the group but has been removed. | |
s.mu.Lock() | |
- r, ok := s.mu.replicas[header.RangeDescriptor.RangeID] | |
- s.mu.Unlock() | |
- rmr := header.RaftMessageRequest | |
- if ok { | |
- if err := checkReplicaTooOld(r, rmr.FromReplica.ReplicaID); err != nil { | |
+ if r, ok := s.mu.replicas[req.RangeID]; ok { | |
+ if err := checkReplicaTooOld(r, req.FromReplica.ReplicaID); err != nil { | |
+ s.mu.Unlock() | |
return err | |
} | |
} | |
- s.mu.Lock() | |
+ s.metrics.raftRcvdMessages[req.Message.Type].Inc(1) | |
+ | |
+ switch req.Message.Type { | |
+ case raftpb.MsgSnap: | |
+ if (inSnap.SnapUUID == uuid.UUID{}) { | |
+ log.Fatalf(ctx, "programming error: received %s via the Raft transport", raftpb.MsgSnap) | |
+ } | |
+ | |
+ case raftpb.MsgHeartbeat: | |
+ // TODO(bdarnell): handle coalesced heartbeats. | |
+ } | |
+ | |
// Lazily create the replica. | |
- r, err := s.getOrCreateReplicaLocked(header.RangeDescriptor.RangeID, | |
- rmr.ToReplica.ReplicaID, rmr.FromReplica) | |
+ r, err := s.getOrCreateReplicaLocked(req.RangeID, req.ToReplica.ReplicaID, req.FromReplica) | |
// TODO(bdarnell): is it safe to release the store lock here? | |
// It deadlocks to hold s.Mutex while calling raftGroup.Step. | |
s.mu.Unlock() | |
if err != nil { | |
return roachpb.NewError(err) | |
} | |
- r.setLastReplicaDescriptors(rmr) | |
- snapUUID, err := uuid.FromBytes(rmr.Message.Snapshot.Data) | |
- if err != nil { | |
- return roachpb.NewError(errors.Wrap(err, "invalid snapshot: unparseable uuid")) | |
- } | |
- inSnap := IncomingSnapshot{ | |
- SnapUUID: *snapUUID, | |
- RangeDescriptor: header.RangeDescriptor, | |
- Batches: batches, | |
- LogEntries: logEntries, | |
- } | |
+ r.setLastReplicaDescriptors(*req) | |
+ | |
+ if req.ToReplica.ReplicaID == 0 { | |
+ if req.Message.Type != raftpb.MsgSnap { | |
+ // We disallow non-snapshot messages to replica ID 0. Note that | |
+ // getOrCreateReplicaLocked disallows moving the replica ID backward, so | |
+ // the only way we can get here is if the replica did not previously exist. | |
+ if log.V(1) { | |
+ log.Infof(ctx, "refusing incoming Raft message %s for range %d from %+v to %+v", | |
+ req.Message.Type, req.RangeID, req.FromReplica, req.ToReplica) | |
+ } | |
+ return roachpb.NewErrorf( | |
+ "cannot recreate replica that is not a member of its range (StoreID %s not found in range %d)", | |
+ r.store.StoreID(), | |
+ req.RangeID, | |
+ ) | |
+ } | |
- if rmr.ToReplica.ReplicaID == 0 { | |
// Allow snapshots to be applied to replicas before they are | |
// members of the raft group (i.e. replicas with an ID of 0). This | |
// is the only operation that can be performed before it is part of | |
@@ -2322,10 +2359,10 @@ func (s *Store) applySnapshot( | |
// get all of Raft's internal safety checks (it confuses messages | |
// at term zero for internal messages). The sending side uses the | |
// term from the snapshot itself, but we'll just check nonzero. | |
- if rmr.Message.Term == 0 { | |
+ if req.Message.Term == 0 { | |
return roachpb.NewErrorf( | |
"preemptive snapshot from term %d received with zero term", | |
- rmr.Message.Snapshot.Metadata.Term, | |
+ req.Message.Snapshot.Metadata.Term, | |
) | |
} | |
// TODO(tschottdorf): A lot of locking of the individual Replica | |
@@ -2387,7 +2424,7 @@ func (s *Store) applySnapshot( | |
return roachpb.NewError(err) | |
} | |
// We have a Raft group; feed it the message. | |
- if err := raftGroup.Step(header.RaftMessageRequest.Message); err != nil { | |
+ if err := raftGroup.Step(req.Message); err != nil { | |
return roachpb.NewError(errors.Wrap(err, "unable to process preemptive snapshot")) | |
} | |
// In the normal case, the group should ask us to apply a snapshot. | |
@@ -2409,9 +2446,9 @@ func (s *Store) applySnapshot( | |
s.mu.Lock() | |
defer s.mu.Unlock() | |
- if addedPlaceholder { | |
+ if inSnap.addedPlaceholder { | |
// Clear the replica placeholder; we are about to swap it with a real replica. | |
- if err := s.removePlaceholderLocked(header.RangeDescriptor.RangeID); err != nil { | |
+ if err := s.removePlaceholderLocked(inSnap.RangeDescriptor.RangeID); err != nil { | |
return roachpb.NewError(err) | |
} | |
} | |
@@ -2429,75 +2466,20 @@ func (s *Store) applySnapshot( | |
} | |
if err := r.withRaftGroup(func(raftGroup *raft.RawNode) error { | |
- return raftGroup.Step(header.RaftMessageRequest.Message) | |
+ return raftGroup.Step(req.Message) | |
}); err != nil { | |
return roachpb.NewError(err) | |
} | |
- // Force the replica to deal with this snapshot right now. | |
- if err := r.handleRaftReady(&inSnap); err != nil { | |
- return roachpb.NewError(errors.Wrap(err, "Didn't process snapshot.")) | |
- } | |
- return nil | |
-} | |
- | |
-// HandleRaftRequest dispatches a raft message to the appropriate Replica. It | |
-// requires that s.processRaftMu and s.mu are not held. | |
-func (s *Store) HandleRaftRequest(ctx context.Context, req *RaftMessageRequest) *roachpb.Error { | |
- s.processRaftMu.Lock() | |
- defer s.processRaftMu.Unlock() | |
- | |
- ctx = s.logContext(ctx) | |
- | |
- // Drop messages that come from a node that we believe was once | |
- // a member of the group but has been removed. | |
- s.mu.Lock() | |
- if r, ok := s.mu.replicas[req.RangeID]; ok { | |
- if err := checkReplicaTooOld(r, req.FromReplica.ReplicaID); err != nil { | |
- s.mu.Unlock() | |
- return err | |
- } | |
- } | |
- | |
- s.metrics.raftRcvdMessages[req.Message.Type].Inc(1) | |
- | |
- switch req.Message.Type { | |
- case raftpb.MsgSnap: | |
- // All snapshots should be handled by the streaming endpoint. | |
- log.Fatal(ctx, "programming error: received a snapshot via the Raft endpoint") | |
- | |
- case raftpb.MsgHeartbeat: | |
- // TODO(bdarnell): handle coalesced heartbeats. | |
- } | |
- | |
- // Lazily create the replica. | |
- r, err := s.getOrCreateReplicaLocked(req.RangeID, req.ToReplica.ReplicaID, req.FromReplica) | |
- // TODO(bdarnell): is it safe to release the store lock here? | |
- // It deadlocks to hold s.Mutex while calling raftGroup.Step. | |
- s.mu.Unlock() | |
- if err != nil { | |
- return roachpb.NewError(err) | |
- } | |
- r.setLastReplicaDescriptors(*req) | |
- if req.ToReplica.ReplicaID == 0 { | |
- // We disallow non-snapshot messages to replica ID 0. Note that | |
- // getOrCreateReplicaLocked disallows moving the replica ID backward, so | |
- // the only way we can get here is if the replica did not previously exist. | |
- if log.V(1) { | |
- log.Infof(ctx, "refusing incoming Raft message %s for range %d from %+v to %+v", | |
- req.Message.Type, req.RangeID, req.FromReplica, req.ToReplica) | |
+ if (inSnap.SnapUUID == uuid.UUID{}) { | |
+ s.enqueueRaftUpdateCheck(req.RangeID) | |
+ } else { | |
+ // Force the replica to deal with this snapshot right now. | |
+ if err := r.handleRaftReady(inSnap); err != nil { | |
+ return roachpb.NewError(errors.Wrap(err, "didn't process snapshot")) | |
} | |
- return roachpb.NewErrorf("cannot recreate replica that is not a member of its range (StoreID %s not found in range %d)", | |
- r.store.StoreID(), req.RangeID) | |
- } | |
- | |
- if err := r.withRaftGroup(func(raftGroup *raft.RawNode) error { | |
- return raftGroup.Step(req.Message) | |
- }); err != nil { | |
- return roachpb.NewError(err) | |
} | |
- s.enqueueRaftUpdateCheck(req.RangeID) | |
return nil | |
} | |
@@ -2747,7 +2729,7 @@ func (s *Store) processRaft() { | |
// parallel. | |
for _, r := range uninitReplicas { | |
start := timeutil.Now() | |
- if err := r.handleRaftReady(nil); err != nil { | |
+ if err := r.handleRaftReady(IncomingSnapshot{}); err != nil { | |
panic(err) // TODO(bdarnell) | |
} | |
maybeWarnDuration(start, r, "handle raft ready") | |
@@ -2760,7 +2742,7 @@ func (s *Store) processRaft() { | |
workQueue <- func() { | |
defer wg.Done() | |
start := timeutil.Now() | |
- if err := r.handleRaftReady(nil); err != nil { | |
+ if err := r.handleRaftReady(IncomingSnapshot{}); err != nil { | |
panic(err) // TODO(bdarnell) | |
} | |
maybeWarnDuration(start, r, "handle raft ready") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment