Skip to content

Instantly share code, notes, and snippets.

@bleskes
Created January 18, 2017 17:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bleskes/9177f512ebe803dc07dbfdda6f8ef2b7 to your computer and use it in GitHub Desktop.
Save bleskes/9177f512ebe803dc07dbfdda6f8ef2b7 to your computer and use it in GitHub Desktop.
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 5bb26b2..6c1f24c 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -41,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
-import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
@@ -62,11 +61,9 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
-import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@@ -176,17 +173,20 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
return;
}
final RecoveryTarget recoveryTarget = recoveryRef.target();
- assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node";
-
- final Optional<StartRecoveryRequest> maybeRequest = getStartRecoveryRequest(recoveryTarget);
- if (!maybeRequest.isPresent()) return;
- else request = maybeRequest.get();
-
cancellableThreads = recoveryTarget.cancellableThreads();
timer = recoveryTarget.state().getTimer();
-
- logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
- recoveryTarget.indexShard().prepareForIndexRecovery();
+ try {
+ assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node";
+ request = getStartRecoveryRequest(recoveryTarget);
+ logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
+ recoveryTarget.indexShard().prepareForIndexRecovery();
+ } catch (Exception e) {
+ // this will be logged as warning later on...
+ logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
+ onGoingRecoveries.failRecovery(recoveryId,
+ new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
+ return;
+ }
}
try {
@@ -290,7 +290,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- private static Optional<Store.MetadataSnapshot> EMPTY_METADATA_SNAPSHOT = Optional.of(Store.MetadataSnapshot.EMPTY);
+ private static Store.MetadataSnapshot EMPTY_METADATA_SNAPSHOT = Store.MetadataSnapshot.EMPTY;
/**
* Obtains a snapshot of the store metadata for the recovery target, or an empty {@link Optional} if obtaining the store metadata
@@ -299,14 +299,14 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metdata, or an empty {@link Optional}
*/
- private Optional<Store.MetadataSnapshot> getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
+ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
try {
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
// we are not going to copy any files, so don't bother listing files, potentially running into concurrency issues with the
// primary changing files underneath us
return EMPTY_METADATA_SNAPSHOT;
} else {
- return Optional.of(recoveryTarget.indexShard().snapshotStoreMetadata());
+ return recoveryTarget.indexShard().snapshotStoreMetadata();
}
} catch (org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
@@ -315,12 +315,6 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
} catch (final IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
return EMPTY_METADATA_SNAPSHOT;
- } catch (final Exception e) {
- // this will be logged as warning later on...
- logger.trace("unexpected error while listing local files, failing recovery", e);
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
- new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true);
- return Optional.empty();
}
}
@@ -330,49 +324,39 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
* @param recoveryTarget the target of the recovery
* @return a start recovery request, or an empty {@link Optional}
*/
- private Optional<StartRecoveryRequest> getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
+ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
- final Optional<Store.MetadataSnapshot> metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
- if (!metadataSnapshot.isPresent()) return Optional.empty();
- logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.get().size());
+ final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
+ logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
- try {
- final long startingSeqNo;
- if (metadataSnapshot.get().size() > 0) {
- startingSeqNo = getStartingSeqNo(recoveryTarget);
- } else {
- startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
- }
-
- if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
- logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
- } else {
- logger.trace(
- "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]",
- recoveryTarget.shardId(),
- startingSeqNo,
- recoveryTarget.sourceNode());
- }
+ final long startingSeqNo;
+ if (metadataSnapshot.size() > 0) {
+ startingSeqNo = getStartingSeqNo(recoveryTarget);
+ } else {
+ startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ }
- request = new StartRecoveryRequest(
+ if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
+ } else {
+ logger.trace(
+ "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]",
recoveryTarget.shardId(),
- recoveryTarget.sourceNode(),
- clusterService.localNode(),
- metadataSnapshot.get(),
- recoveryTarget.state().getPrimary(),
- recoveryTarget.recoveryId(),
- startingSeqNo);
-
- } catch (final Exception e) {
- // this will be logged as warning later on...
- logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
- new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
- return Optional.empty();
+ startingSeqNo,
+ recoveryTarget.sourceNode());
}
- return Optional.of(request);
+
+ request = new StartRecoveryRequest(
+ recoveryTarget.shardId(),
+ recoveryTarget.sourceNode(),
+ clusterService.localNode(),
+ metadataSnapshot,
+ recoveryTarget.state().getPrimary(),
+ recoveryTarget.recoveryId(),
+ startingSeqNo);
+ return request;
}
/**
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 5bb26b2..6c1f24c 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -41,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
-import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
@@ -62,11 +61,9 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
-import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@@ -176,17 +173,20 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
return;
}
final RecoveryTarget recoveryTarget = recoveryRef.target();
- assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node";
-
- final Optional<StartRecoveryRequest> maybeRequest = getStartRecoveryRequest(recoveryTarget);
- if (!maybeRequest.isPresent()) return;
- else request = maybeRequest.get();
-
cancellableThreads = recoveryTarget.cancellableThreads();
timer = recoveryTarget.state().getTimer();
-
- logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
- recoveryTarget.indexShard().prepareForIndexRecovery();
+ try {
+ assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node";
+ request = getStartRecoveryRequest(recoveryTarget);
+ logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
+ recoveryTarget.indexShard().prepareForIndexRecovery();
+ } catch (Exception e) {
+ // this will be logged as warning later on...
+ logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
+ onGoingRecoveries.failRecovery(recoveryId,
+ new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
+ return;
+ }
}
try {
@@ -290,7 +290,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- private static Optional<Store.MetadataSnapshot> EMPTY_METADATA_SNAPSHOT = Optional.of(Store.MetadataSnapshot.EMPTY);
+ private static Store.MetadataSnapshot EMPTY_METADATA_SNAPSHOT = Store.MetadataSnapshot.EMPTY;
/**
* Obtains a snapshot of the store metadata for the recovery target, or an empty {@link Optional} if obtaining the store metadata
@@ -299,14 +299,14 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metdata, or an empty {@link Optional}
*/
- private Optional<Store.MetadataSnapshot> getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
+ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
try {
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
// we are not going to copy any files, so don't bother listing files, potentially running into concurrency issues with the
// primary changing files underneath us
return EMPTY_METADATA_SNAPSHOT;
} else {
- return Optional.of(recoveryTarget.indexShard().snapshotStoreMetadata());
+ return recoveryTarget.indexShard().snapshotStoreMetadata();
}
} catch (org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
@@ -315,12 +315,6 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
} catch (final IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
return EMPTY_METADATA_SNAPSHOT;
- } catch (final Exception e) {
- // this will be logged as warning later on...
- logger.trace("unexpected error while listing local files, failing recovery", e);
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
- new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true);
- return Optional.empty();
}
}
@@ -330,49 +324,39 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
* @param recoveryTarget the target of the recovery
* @return a start recovery request, or an empty {@link Optional}
*/
- private Optional<StartRecoveryRequest> getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
+ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
- final Optional<Store.MetadataSnapshot> metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
- if (!metadataSnapshot.isPresent()) return Optional.empty();
- logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.get().size());
+ final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
+ logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
- try {
- final long startingSeqNo;
- if (metadataSnapshot.get().size() > 0) {
- startingSeqNo = getStartingSeqNo(recoveryTarget);
- } else {
- startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
- }
-
- if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
- logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
- } else {
- logger.trace(
- "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]",
- recoveryTarget.shardId(),
- startingSeqNo,
- recoveryTarget.sourceNode());
- }
+ final long startingSeqNo;
+ if (metadataSnapshot.size() > 0) {
+ startingSeqNo = getStartingSeqNo(recoveryTarget);
+ } else {
+ startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ }
- request = new StartRecoveryRequest(
+ if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
+ } else {
+ logger.trace(
+ "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]",
recoveryTarget.shardId(),
- recoveryTarget.sourceNode(),
- clusterService.localNode(),
- metadataSnapshot.get(),
- recoveryTarget.state().getPrimary(),
- recoveryTarget.recoveryId(),
- startingSeqNo);
-
- } catch (final Exception e) {
- // this will be logged as warning later on...
- logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
- new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
- return Optional.empty();
+ startingSeqNo,
+ recoveryTarget.sourceNode());
}
- return Optional.of(request);
+
+ request = new StartRecoveryRequest(
+ recoveryTarget.shardId(),
+ recoveryTarget.sourceNode(),
+ clusterService.localNode(),
+ metadataSnapshot,
+ recoveryTarget.state().getPrimary(),
+ recoveryTarget.recoveryId(),
+ startingSeqNo);
+ return request;
}
/**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment