Skip to content

Instantly share code, notes, and snippets.

@tteofili
Created February 7, 2018 12:20
Show Gist options
  • Save tteofili/f60bd633557b93be106dc8e806d2b8fa to your computer and use it in GitHub Desktop.
Save tteofili/f60bd633557b93be106dc8e806d2b8fa to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
/**
* This {@link MergePolicy} extends Lucene's {@link org.apache.lucene.index.TieredMergePolicy} by throttling
* merge operations in case the index is under high commit load.
* With this {@link MergePolicy} less frequent but bigger merges are expected, only when commit rate
* is under a certain threshold (in terms of added docs per sec and MBs per sec).
* <p>
* Auto tuning params:
* Max commit rate parameters (e.g. docs / MB per sec) would rather not be changed by hand, instead this implementation tries
* to adapt to the average commit load for each particular case (using single exponential smoothing for time series).
* Anyway, throttling should not make the no. of segments go beyond a certain (configurable) threshold.
* In case the no. of segments is above such a threshold, no throttling is done and merging happens as per plain {@link TieredMergePolicy} algorithm.
*/
public class ThrottlingTieredMergePolicy extends TieredMergePolicy {
private static final double DEFAULT_MAX_COMMIT_RATE_DOCS = 1000;
private static final double DEFAULT_TIMESERIES_LENGTH = 5d;
private static final double DEFAULT_MAX_COMMIT_RATE_MB = 5;
private static final int DEFAULT_MAX_NO_OF_SEGS_FOR_THROTTLING = 20;
private static final double DEFAULT_SMOOTHING_ALPHA = 0.7;
/**
* max docs per second rate allowed for throttling
*/
private double maxCommitRateDocs = DEFAULT_MAX_COMMIT_RATE_DOCS;
/**
* max mb per second rate allowed for throttling
*/
private double maxCommitRateMB = DEFAULT_MAX_COMMIT_RATE_MB;
/**
* max no. of segments allowed for throttling to happen
*/
private int maxNoOfSegsForThrottling = DEFAULT_MAX_NO_OF_SEGS_FOR_THROTTLING;
/**
* initial values for time series
*/
private double avgCommitRateDocs = 0d;
private double avgCommitRateMB = 0d;
private double avgSegs = 0;
/**
* length of time series analysis for commit rate and no. of segments
*/
private double timeSeriesLength = DEFAULT_TIMESERIES_LENGTH;
/**
* single exponential smoothing ratio (0 < alpha < 1)
* <p>
* values towards 0 tend to give more weight to past inputs
* values close to 1 weigh recent values more
*/
private double alpha = DEFAULT_SMOOTHING_ALPHA;
/**
* current step in current time series batch
*/
private double timeSeriesCount = 0d;
/**
* current maxDoc count
*/
private double docCount = 0d;
/**
* current segments MB count
*/
private double mb = 0d;
/**
* current time
*/
private double time = System.currentTimeMillis();
/**
* single exponential smoothing
*
* @param input current time series value
* @param smoothedValue previous smoothed value
* @return the new smoothed value
*/
private double singleExpSmoothing(double input, double smoothedValue) {
return alpha * input + (1 - alpha) * smoothedValue;
}
/**
* Set the max number of MB per sec allowed before throttling kicks in
*
* @param maxCommitRateMB max MB per sec
* @return this instance
*/
public ThrottlingTieredMergePolicy setMaxCommitRateMB(double maxCommitRateMB) {
this.maxCommitRateMB = maxCommitRateMB;
return this;
}
/**
* Set the max number of added docs per sec allowed before throttling kicks in
*
* @param maxCommitRateDocs max docs per sec
* @return this instance
*/
public ThrottlingTieredMergePolicy setMaxCommitRateDocs(double maxCommitRateDocs) {
this.maxCommitRateDocs = maxCommitRateDocs;
return this;
}
/**
* Set the number of allowed segments for throttling to happen
*
* @param maxNoOfSegsForThrottling maximum number of segments
* @return this instance
*/
public ThrottlingTieredMergePolicy setMaxNoOfSegsForThrottling(int maxNoOfSegsForThrottling) {
this.maxNoOfSegsForThrottling = maxNoOfSegsForThrottling;
return this;
}
/**
* Set the number of points to keep in a commit rate analysis time series.
* A point is added each time #findMerges is called, default value is #DEFAULT_TIMESERIES_LENGTH
*
* @param timeSeriesLength the length of the time series
* @return this instance
*/
public ThrottlingTieredMergePolicy setTimeSeriesLength(double timeSeriesLength) {
this.timeSeriesLength = timeSeriesLength;
return this;
}
/**
* set single exponential smoothing alpha parameter
*
* @param alpha the alpha parameter
* @return this instance
*/
public ThrottlingTieredMergePolicy setAlpha(double alpha) {
this.alpha = alpha;
return this;
}
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {
int segmentSize = infos.size();
timeSeriesCount++;
if (timeSeriesCount % timeSeriesLength == 0) {
// reset averages
avgCommitRateDocs = 0d;
avgCommitRateMB = 0d;
avgSegs = 0d;
}
avgSegs = singleExpSmoothing(segmentSize, avgSegs);
if (verbose(writer)) {
message("findMerges: " + segmentSize + " segments, " + avgSegs + " average", writer);
}
if (segmentSize == 0) {
return null;
}
// if no. of segments exceeds the maximum, adjust the maximum rates to allow more merges (less commit/rate throttling)
if (segmentSize > maxNoOfSegsForThrottling) {
if (avgCommitRateDocs > maxCommitRateDocs) {
double v = singleExpSmoothing(avgCommitRateDocs, maxCommitRateDocs);
if (verbose(writer)) {
message("adjusting maxCommitRateDocs from " + maxCommitRateDocs + " to " + v, writer);
}
maxCommitRateDocs = v;
}
if (avgCommitRateMB > maxCommitRateMB) {
double v = singleExpSmoothing(avgCommitRateMB, maxCommitRateMB);
if (verbose(writer)) {
message("adjusting maxCommitRateMB from " + maxCommitRateMB + " to " + v, writer);
}
maxCommitRateMB = v;
}
}
long now = System.currentTimeMillis();
double timeDelta = (now / 1000d) - (time / 1000d);
double commitRate = Math.abs(docCount - infos.totalMaxDoc()) / timeDelta;
time = now;
avgCommitRateDocs = singleExpSmoothing(commitRate, avgCommitRateDocs);
if (verbose(writer)) {
message("commit rate: current " + commitRate + ", average " + avgCommitRateDocs + ", max " + maxCommitRateDocs + " docs/sec", writer);
}
docCount = infos.totalMaxDoc();
// do not throttle if there're too many segments to avoid affecting performance
if (commitRate > maxCommitRateDocs && segmentSize < maxNoOfSegsForThrottling) {
if (verbose(writer)) {
message("throttling merges : " + commitRate + " > " + maxCommitRateDocs + " docs/sec and segments " + segmentSize + " < " + maxNoOfSegsForThrottling + ")", writer);
}
return null;
}
double idxBytes = 0;
for (SegmentCommitInfo info : infos) {
idxBytes += info.sizeInBytes();
}
double bytes = idxBytes - this.mb;
double mbRate = bytes / timeDelta;
avgCommitRateMB = singleExpSmoothing(mbRate, avgCommitRateMB);
if (verbose(writer)) {
message("commit rate: current " + mbRate + ", average " + avgCommitRateMB + ", max " + maxCommitRateMB + " MB/sec", writer);
}
this.mb = idxBytes;
// do not throttle if there're too many segments to avoid affecting performance
if (mbRate > maxCommitRateMB && segmentSize < maxNoOfSegsForThrottling) {
if (verbose(writer)) {
message("throttling merges : " + mbRate + " > " + maxCommitRateMB + " MB/sec and segments " + segmentSize + " < " + maxNoOfSegsForThrottling + ")", writer);
}
return null;
}
return super.findMerges(mergeTrigger, infos, writer);
}
private boolean verbose(IndexWriter writer) {
return writer != null && writer.infoStream.isEnabled("TMP");
}
private void message(String message, IndexWriter writer) {
writer.infoStream.message("TMP", message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment