Skip to content

Instantly share code, notes, and snippets.

@s1monw
Last active August 29, 2015 14:01
Show Gist options
  • Save s1monw/2f4173c804b574312fc2 to your computer and use it in GitHub Desktop.
Save s1monw/2f4173c804b574312fc2 to your computer and use it in GitHub Desktop.
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.index;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.collect.IdentityHashSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AbstractMergeScheduler extends MergeScheduler {
private final AtomicInteger numPendingMerges = new AtomicInteger();
private final AtomicInteger numRunningMerges = new AtomicInteger();
private final Set<MergeTask> runningTasks = new IdentityHashSet<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
protected final ExecutorService service;
protected final Lock lock = new ReentrantLock();
private final int maxPendingMerges;
private final int maxRunningMerges;
private final Condition mergeStarted = lock.newCondition();
private final Condition noMoreMerges = lock.newCondition();
private final boolean terminateService;
public AbstractMergeScheduler(ExecutorService service, int maxPendingMerges, int maxRunningMerges, boolean terminateService) {
this.service = service;
this.maxPendingMerges = maxPendingMerges;
this.maxRunningMerges = maxRunningMerges;
this.terminateService = terminateService;
}
@Override
public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
lock.lock();
try {
while(!closed.get()) {
final List<MergeTask> merges = new ArrayList<>();
MergePolicy.OneMerge nextMerge;
do {
nextMerge = writer.getNextMerge();
merges.add(new MergeTask(writer, nextMerge));
} while (nextMerge != null);
if (merges.isEmpty()) {
break;
}
CollectionUtil.timSort(merges);
for (MergeTask task : merges) {
numPendingMerges.incrementAndGet();
boolean success = false;
try {
this.service.submit(task);
success = true;
} finally {
if (!success) {
numPendingMerges.decrementAndGet();
}
}
}
}
awaitMaxPendingMerges();
} finally {
lock.unlock();
}
}
public int numPendingMerges() {
return numPendingMerges.get();
}
public int numRunningMerges() {
return numRunningMerges.get();
}
@Override
public final void close() throws IOException {
if (closed.compareAndSet(false, true)) {
closeInternal();
}
}
public void closeInternal() {
if (terminateService) {
this.service.shutdown();
}
lock.lock();
try {
while (numPendingMerges() != 0 || numRunningMerges() != 0) {
noMoreMerges.await();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
protected void runMerge(IndexWriter writer, MergePolicy.OneMerge oneMerge) throws IOException {
writer.merge(oneMerge);
}
protected void onMergeStart(MergeTask mergeTask) {
lock.lock();
numRunningMerges.incrementAndGet();
numPendingMerges.decrementAndGet();
try {
try {
runningTasks.add(mergeTask);
updateMergeThreads();
} finally {
mergeStarted.signalAll();
}
} finally {
lock.unlock();
}
}
private void awaitMaxPendingMerges() {
lock.lock();
try {
while (numPendingMerges() > maxPendingMerges) {
mergeStarted.await();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
protected void onMergeEnd(MergeTask mergeTask) {
numRunningMerges.decrementAndGet();
lock.lock();
try {
try {
runningTasks.remove(mergeTask);
updateMergeThreads();
} finally {
if (numPendingMerges() == 0 && numRunningMerges() == 0) {
noMoreMerges.signalAll();
}
}
} finally {
lock.unlock();
}
}
protected void handleException(Throwable ex, IndexWriter writer) {
try {
// When an exception is hit during merge, IndexWriter
// removes any partial files and then allows another
// merge to run. If whatever caused the error is not
// transient then the exception will keep happening,
// so, we sleep here to avoid saturating CPU in such
// cases:
Thread.sleep(250);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
throw new MergePolicy.MergeException(ex, writer.getDirectory());
}
@Override
public MergeScheduler clone() {
return new AbstractMergeScheduler(service, maxPendingMerges, maxRunningMerges, terminateService);
}
private void updateMergeThreads() {
int runningMerges = numRunningMerges();
int limit = Math.min(runningMerges, maxRunningMerges);
if (limit <= runningTasks.size()) {
return;
}
lock.lock();
try {
MergeTask[] tasks = this.runningTasks.toArray(new MergeTask[runningTasks.size()]);
ArrayUtil.timSort(tasks);
limit = Math.min(tasks.length, maxRunningMerges);
for (int i = 0; i < limit; i++) {
tasks[i].unpause();
}
for (int i = limit; i < tasks.length; i++) {
tasks[i].pause();
}
} finally {
lock.unlock();
}
}
final class MergeTask implements Callable<MergePolicy.OneMerge>, Comparable<MergeTask> {
private final IndexWriter writer;
private final MergePolicy.OneMerge oneMerge;
MergeTask(IndexWriter writer, MergePolicy.OneMerge oneMerge) {
this.writer = writer;
this.oneMerge = oneMerge;
}
@Override
public MergePolicy.OneMerge call() throws IOException {
try {
onMergeStart(this);
runMerge(writer, oneMerge);
} catch (Throwable ex) {
handleException(ex, writer);
} finally {
onMergeEnd(this);
}
return oneMerge;
}
@Override
public int compareTo(MergeTask o) {
return Integer.compare(oneMerge.totalDocCount, o.oneMerge.totalDocCount);
}
void pause() {
if (!this.oneMerge.getPause()) {
message("pause thread " + Thread.currentThread().getName());
this.oneMerge.setPause(true);
}
}
void unpause() {
if (this.oneMerge.getPause()) {
message("unpause thread " + Thread.currentThread().getName());
this.oneMerge.setPause(false);
}
}
protected boolean verbose() {
return writer != null && writer.infoStream.isEnabled("CMS");
}
/**
* Outputs the given message - this method assumes {@link #verbose()} was
* called and returned true.
*/
protected void message(String message) {
writer.infoStream.message("CMS", message);
}
}
public static class SerialMergeScheduler extends AbstractMergeScheduler {
public SerialMergeScheduler() {
super(MoreExecutors.sameThreadExecutor(), 1, 1, false);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment