Created
February 7, 2018 12:20
-
-
Save tteofili/f60bd633557b93be106dc8e806d2b8fa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.lucene.index; | |
import java.io.IOException; | |
/** | |
* This {@link MergePolicy} extends Lucene's {@link org.apache.lucene.index.TieredMergePolicy} by throttling | |
* merge operations in case the index is under high commit load. | |
* With this {@link MergePolicy} less frequent but bigger merges are expected, only when commit rate | |
* is under a certain threshold (in terms of added docs per sec and MBs per sec). | |
* <p> | |
* Auto tuning params: | |
* Max commit rate parameters (e.g. docs / MB per sec) would rather not be changed by hand, instead this implementation tries | |
* to adapt to the average commit load for each particular case (using single exponential smoothing for time series). | |
* Anyway, throttling should not make the no. of segments go beyond a certain (configurable) threshold. | |
* In case the no. of segments is above such a threshold, no throttling is done and merging happens as per plain {@link TieredMergePolicy} algorithm. | |
*/ | |
public class ThrottlingTieredMergePolicy extends TieredMergePolicy { | |
private static final double DEFAULT_MAX_COMMIT_RATE_DOCS = 1000; | |
private static final double DEFAULT_TIMESERIES_LENGTH = 5d; | |
private static final double DEFAULT_MAX_COMMIT_RATE_MB = 5; | |
private static final int DEFAULT_MAX_NO_OF_SEGS_FOR_THROTTLING = 20; | |
private static final double DEFAULT_SMOOTHING_ALPHA = 0.7; | |
/** | |
* max docs per second rate allowed for throttling | |
*/ | |
private double maxCommitRateDocs = DEFAULT_MAX_COMMIT_RATE_DOCS; | |
/** | |
* max mb per second rate allowed for throttling | |
*/ | |
private double maxCommitRateMB = DEFAULT_MAX_COMMIT_RATE_MB; | |
/** | |
* max no. of segments allowed for throttling to happen | |
*/ | |
private int maxNoOfSegsForThrottling = DEFAULT_MAX_NO_OF_SEGS_FOR_THROTTLING; | |
/** | |
* initial values for time series | |
*/ | |
private double avgCommitRateDocs = 0d; | |
private double avgCommitRateMB = 0d; | |
private double avgSegs = 0; | |
/** | |
* length of time series analysis for commit rate and no. of segments | |
*/ | |
private double timeSeriesLength = DEFAULT_TIMESERIES_LENGTH; | |
/** | |
* single exponential smoothing ratio (0 < alpha < 1) | |
* <p> | |
* values towards 0 tend to give more weight to past inputs | |
* values close to 1 weigh recent values more | |
*/ | |
private double alpha = DEFAULT_SMOOTHING_ALPHA; | |
/** | |
* current step in current time series batch | |
*/ | |
private double timeSeriesCount = 0d; | |
/** | |
* current maxDoc count | |
*/ | |
private double docCount = 0d; | |
/** | |
* current segments MB count | |
*/ | |
private double mb = 0d; | |
/** | |
* current time | |
*/ | |
private double time = System.currentTimeMillis(); | |
/** | |
* single exponential smoothing | |
* | |
* @param input current time series value | |
* @param smoothedValue previous smoothed value | |
* @return the new smoothed value | |
*/ | |
private double singleExpSmoothing(double input, double smoothedValue) { | |
return alpha * input + (1 - alpha) * smoothedValue; | |
} | |
/** | |
* Set the max number of MB per sec allowed before throttling kicks in | |
* | |
* @param maxCommitRateMB max MB per sec | |
* @return this instance | |
*/ | |
public ThrottlingTieredMergePolicy setMaxCommitRateMB(double maxCommitRateMB) { | |
this.maxCommitRateMB = maxCommitRateMB; | |
return this; | |
} | |
/** | |
* Set the max number of added docs per sec allowed before throttling kicks in | |
* | |
* @param maxCommitRateDocs max docs per sec | |
* @return this instance | |
*/ | |
public ThrottlingTieredMergePolicy setMaxCommitRateDocs(double maxCommitRateDocs) { | |
this.maxCommitRateDocs = maxCommitRateDocs; | |
return this; | |
} | |
/** | |
* Set the number of allowed segments for throttling to happen | |
* | |
* @param maxNoOfSegsForThrottling maximum number of segments | |
* @return this instance | |
*/ | |
public ThrottlingTieredMergePolicy setMaxNoOfSegsForThrottling(int maxNoOfSegsForThrottling) { | |
this.maxNoOfSegsForThrottling = maxNoOfSegsForThrottling; | |
return this; | |
} | |
/** | |
* Set the number of points to keep in a commit rate analysis time series. | |
* A point is added each time #findMerges is called, default value is #DEFAULT_TIMESERIES_LENGTH | |
* | |
* @param timeSeriesLength the length of the time series | |
* @return this instance | |
*/ | |
public ThrottlingTieredMergePolicy setTimeSeriesLength(double timeSeriesLength) { | |
this.timeSeriesLength = timeSeriesLength; | |
return this; | |
} | |
/** | |
* set single exponential smoothing alpha parameter | |
* | |
* @param alpha the alpha parameter | |
* @return this instance | |
*/ | |
public ThrottlingTieredMergePolicy setAlpha(double alpha) { | |
this.alpha = alpha; | |
return this; | |
} | |
@Override | |
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException { | |
int segmentSize = infos.size(); | |
timeSeriesCount++; | |
if (timeSeriesCount % timeSeriesLength == 0) { | |
// reset averages | |
avgCommitRateDocs = 0d; | |
avgCommitRateMB = 0d; | |
avgSegs = 0d; | |
} | |
avgSegs = singleExpSmoothing(segmentSize, avgSegs); | |
if (verbose(writer)) { | |
message("findMerges: " + segmentSize + " segments, " + avgSegs + " average", writer); | |
} | |
if (segmentSize == 0) { | |
return null; | |
} | |
// if no. of segments exceeds the maximum, adjust the maximum rates to allow more merges (less commit/rate throttling) | |
if (segmentSize > maxNoOfSegsForThrottling) { | |
if (avgCommitRateDocs > maxCommitRateDocs) { | |
double v = singleExpSmoothing(avgCommitRateDocs, maxCommitRateDocs); | |
if (verbose(writer)) { | |
message("adjusting maxCommitRateDocs from " + maxCommitRateDocs + " to " + v, writer); | |
} | |
maxCommitRateDocs = v; | |
} | |
if (avgCommitRateMB > maxCommitRateMB) { | |
double v = singleExpSmoothing(avgCommitRateMB, maxCommitRateMB); | |
if (verbose(writer)) { | |
message("adjusting maxCommitRateMB from " + maxCommitRateMB + " to " + v, writer); | |
} | |
maxCommitRateMB = v; | |
} | |
} | |
long now = System.currentTimeMillis(); | |
double timeDelta = (now / 1000d) - (time / 1000d); | |
double commitRate = Math.abs(docCount - infos.totalMaxDoc()) / timeDelta; | |
time = now; | |
avgCommitRateDocs = singleExpSmoothing(commitRate, avgCommitRateDocs); | |
if (verbose(writer)) { | |
message("commit rate: current " + commitRate + ", average " + avgCommitRateDocs + ", max " + maxCommitRateDocs + " docs/sec", writer); | |
} | |
docCount = infos.totalMaxDoc(); | |
// do not throttle if there're too many segments to avoid affecting performance | |
if (commitRate > maxCommitRateDocs && segmentSize < maxNoOfSegsForThrottling) { | |
if (verbose(writer)) { | |
message("throttling merges : " + commitRate + " > " + maxCommitRateDocs + " docs/sec and segments " + segmentSize + " < " + maxNoOfSegsForThrottling + ")", writer); | |
} | |
return null; | |
} | |
double idxBytes = 0; | |
for (SegmentCommitInfo info : infos) { | |
idxBytes += info.sizeInBytes(); | |
} | |
double bytes = idxBytes - this.mb; | |
double mbRate = bytes / timeDelta; | |
avgCommitRateMB = singleExpSmoothing(mbRate, avgCommitRateMB); | |
if (verbose(writer)) { | |
message("commit rate: current " + mbRate + ", average " + avgCommitRateMB + ", max " + maxCommitRateMB + " MB/sec", writer); | |
} | |
this.mb = idxBytes; | |
// do not throttle if there're too many segments to avoid affecting performance | |
if (mbRate > maxCommitRateMB && segmentSize < maxNoOfSegsForThrottling) { | |
if (verbose(writer)) { | |
message("throttling merges : " + mbRate + " > " + maxCommitRateMB + " MB/sec and segments " + segmentSize + " < " + maxNoOfSegsForThrottling + ")", writer); | |
} | |
return null; | |
} | |
return super.findMerges(mergeTrigger, infos, writer); | |
} | |
private boolean verbose(IndexWriter writer) { | |
return writer != null && writer.infoStream.isEnabled("TMP"); | |
} | |
private void message(String message, IndexWriter writer) { | |
writer.infoStream.message("TMP", message); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment