-
-
Save bjacobowitz/be9a54663217c46cc239cc9f9f580de8 to your computer and use it in GitHub Desktop.
WrappedCandidateMatcher enabling creation of ParallelMatcher outside package
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.foo.bar.something; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import org.apache.lucene.monitor.*; | |
import org.apache.lucene.search.IndexSearcher; | |
import org.apache.lucene.search.Query; | |
/** | |
* Matcher class that runs matching queries in parallel. | |
* | |
* <p>This class delegates the actual matching to separate CandidateMatcher classes, built from a | |
* passed in MatcherFactory. | |
* | |
* <p>Use this when individual queries can take a long time to run, and you want to minimize | |
* latency. The matcher distributes queries amongst its worker threads using a BlockingQueue, and | |
* synchronization overhead may affect performance if the individual queries are very fast. | |
* | |
* @param <T> the QueryMatch type returned | |
* @see PartitionMatcher | |
*/ | |
public class ParallelMatcher<T extends QueryMatch> extends CandidateMatcher<T> { | |
private final BlockingQueue<MatcherTask> queue = new LinkedBlockingQueue<>(1024); | |
private final List<Future<CandidateMatcher<T>>> futures = new ArrayList<>(); | |
private final CandidateMatcher<T> collectorMatcher; | |
/** | |
* Create a new ParallelMatcher | |
* | |
* @param searcher the IndexSearcher to match against | |
* @param executor an ExecutorService to use for parallel execution | |
* @param matcherFactory MatcherFactory to use to create CandidateMatchers | |
* @param threads the number of threads to execute on | |
*/ | |
private ParallelMatcher( | |
IndexSearcher searcher, | |
ExecutorService executor, | |
MatcherFactory<T> matcherFactory, | |
int threads) { | |
super(searcher); | |
for (int i = 0; i < threads; i++) { | |
MatcherWorker mw = new MatcherWorker(matcherFactory); | |
futures.add(executor.submit(mw)); | |
} | |
collectorMatcher = matcherFactory.createMatcher(searcher); | |
} | |
@Override | |
protected void matchQuery(String queryId, Query matchQuery, Map<String, String> metadata) | |
throws IOException { | |
try { | |
queue.put(new ParallelMatcher.MatcherTask(queryId, matchQuery, metadata)); | |
} catch (InterruptedException e) { | |
throw new IOException("Interrupted during match", e); | |
} | |
} | |
@Override | |
public T resolve(T match1, T match2) { | |
return collectorMatcher.resolve(match1, match2); | |
} | |
@Override | |
protected void doFinish() { | |
try { | |
for (int i = 0; i < futures.size(); i++) { | |
queue.put(END); | |
} | |
for (Future<CandidateMatcher<T>> future : futures) { | |
// PROBLEM: .finish() is not accessible | |
MultiMatchingQueries<T> matches = future.get().finish(0, 0); | |
for (int doc = 0; doc < matches.getBatchSize(); doc++) { | |
for (T match : matches.getMatches(doc)) { | |
this.addMatch(match, doc); | |
} | |
} | |
for (Map.Entry<String, Exception> error : matches.getErrors().entrySet()) { | |
// PROBLEM: .reportError() is not accessible | |
this.reportError(error.getKey(), error.getValue()); | |
} | |
} | |
} catch (InterruptedException | ExecutionException e) { | |
throw new RuntimeException("Interrupted during match", e); | |
} | |
} | |
private class MatcherWorker implements Callable<CandidateMatcher<T>> { | |
final CandidateMatcher<T> matcher; | |
private MatcherWorker(MatcherFactory<T> matcherFactory) { | |
this.matcher = matcherFactory.createMatcher(searcher); | |
} | |
@Override | |
public CandidateMatcher<T> call() { | |
MatcherTask task; | |
try { | |
while ((task = queue.take()) != END) { | |
try { | |
// PROBLEM: .matchQuery() is not accessible | |
matcher.matchQuery(task.id, task.matchQuery, task.metadata); | |
} catch (IOException e) { | |
// PROBLEM: .reportError() is not accessible | |
matcher.reportError(task.id, e); | |
} | |
} | |
} catch (InterruptedException e) { | |
throw new RuntimeException("Interrupted during match", e); | |
} | |
return matcher; | |
} | |
} | |
private static class MatcherTask { | |
final String id; | |
final Query matchQuery; | |
final Map<String, String> metadata; | |
private MatcherTask(String id, Query matchQuery, Map<String, String> metadata) { | |
this.id = id; | |
this.matchQuery = matchQuery; | |
this.metadata = metadata; | |
} | |
} | |
/* Marker object placed on the queue after all matches are done, to indicate to the | |
worker threads that they should finish */ | |
private static final MatcherTask END = new MatcherTask("", null, Collections.emptyMap()); | |
private static class ParallelMatcherFactory<T extends QueryMatch> implements MatcherFactory<T> { | |
private final ExecutorService executor; | |
private final MatcherFactory<T> matcherFactory; | |
private final int threads; | |
ParallelMatcherFactory( | |
ExecutorService executor, MatcherFactory<T> matcherFactory, int threads) { | |
this.executor = executor; | |
this.matcherFactory = matcherFactory; | |
this.threads = threads; | |
} | |
@Override | |
public ParallelMatcher<T> createMatcher(IndexSearcher searcher) { | |
return new ParallelMatcher<>(searcher, executor, matcherFactory, threads); | |
} | |
} | |
/** | |
* Create a new MatcherFactory for a ParallelMatcher | |
* | |
* @param executor the ExecutorService to use | |
* @param matcherFactory the MatcherFactory to use to create submatchers | |
* @param threads the number of threads to use | |
* @param <T> the type of QueryMatch generated | |
*/ | |
public static <T extends QueryMatch> MatcherFactory<T> factory( | |
ExecutorService executor, MatcherFactory<T> matcherFactory, int threads) { | |
return new ParallelMatcher.ParallelMatcherFactory<>(executor, matcherFactory, threads); | |
} | |
/** | |
* Create a new MatcherFactory for a ParallelMatcher | |
* | |
* <p>This factory will create a ParallelMatcher that uses as many threads as there are cores | |
* available to the JVM (as determined by {@code Runtime.getRuntime().availableProcessors()}). | |
* | |
* @param executor the ExecutorService to use | |
* @param matcherFactory the MatcherFactory to use to create submatchers | |
* @param <T> the type of QueryMatch generated | |
*/ | |
public static <T extends QueryMatch> MatcherFactory<T> factory( | |
ExecutorService executor, MatcherFactory<T> matcherFactory) { | |
int threads = Runtime.getRuntime().availableProcessors(); | |
return new ParallelMatcherFactory<>(executor, matcherFactory, threads); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.foo.bar.something; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import org.apache.lucene.monitor.*; | |
import org.apache.lucene.search.IndexSearcher; | |
import org.apache.lucene.search.Query; | |
/** | |
* Matcher class that runs matching queries in parallel. | |
* | |
* <p>This class delegates the actual matching to separate CandidateMatcher classes, built from a | |
* passed in MatcherFactory. | |
* | |
* <p>Use this when individual queries can take a long time to run, and you want to minimize | |
* latency. The matcher distributes queries amongst its worker threads using a BlockingQueue, and | |
* synchronization overhead may affect performance if the individual queries are very fast. | |
* | |
* @param <T> the QueryMatch type returned | |
* @see PartitionMatcher | |
*/ | |
public class ParallelMatcherUsingWrapped<T extends QueryMatch> extends CandidateMatcher<T> { | |
private final BlockingQueue<MatcherTask> queue = new LinkedBlockingQueue<>(1024); | |
private final List<Future<WrappedCandidateMatcher<T>>> futures = new ArrayList<>(); | |
private final WrappedCandidateMatcher<T> collectorMatcher; | |
/** | |
* Create a new ParallelMatcher | |
* | |
* @param searcher the IndexSearcher to match against | |
* @param executor an ExecutorService to use for parallel execution | |
* @param matcherFactory MatcherFactory to use to create CandidateMatchers | |
* @param threads the number of threads to execute on | |
*/ | |
private ParallelMatcherUsingWrapped( | |
IndexSearcher searcher, | |
ExecutorService executor, | |
WrappedMatcherFactory<T> matcherFactory, | |
int threads) { | |
super(searcher); | |
for (int i = 0; i < threads; i++) { | |
MatcherWorker mw = new MatcherWorker(matcherFactory); | |
futures.add(executor.submit(mw)); | |
} | |
collectorMatcher = matcherFactory.createWrappedMatcher(searcher); | |
} | |
@Override | |
public void matchQuery(String queryId, Query matchQuery, Map<String, String> metadata) | |
throws IOException { | |
try { | |
queue.put(new MatcherTask(queryId, matchQuery, metadata)); | |
} catch (InterruptedException e) { | |
throw new IOException("Interrupted during match", e); | |
} | |
} | |
@Override | |
public T resolve(T match1, T match2) { | |
return collectorMatcher.resolve(match1, match2); | |
} | |
@Override | |
protected void doFinish() { | |
try { | |
for (int i = 0; i < futures.size(); i++) { | |
queue.put(END); | |
} | |
for (Future<WrappedCandidateMatcher<T>> future : futures) { | |
// SOLUTION: .finish() is accessible on WrappedCandidateMatcher | |
MultiMatchingQueries<T> matches = future.get().finish(0, 0); | |
for (int doc = 0; doc < matches.getBatchSize(); doc++) { | |
for (T match : matches.getMatches(doc)) { | |
this.addMatch(match, doc); | |
} | |
} | |
for (Map.Entry<String, Exception> error : matches.getErrors().entrySet()) { | |
// SOLUTION: .reportError() is accessible WrappedCandidateMatcher | |
collectorMatcher.reportError(error.getKey(), error.getValue()); | |
} | |
} | |
} catch (InterruptedException | ExecutionException e) { | |
throw new RuntimeException("Interrupted during match", e); | |
} | |
} | |
private class MatcherWorker implements Callable<WrappedCandidateMatcher<T>> { | |
final WrappedCandidateMatcher<T> matcher; | |
private MatcherWorker(WrappedMatcherFactory<T> matcherFactory) { | |
this.matcher = matcherFactory.createWrappedMatcher(searcher); | |
} | |
@Override | |
public WrappedCandidateMatcher<T> call() { | |
MatcherTask task; | |
try { | |
while ((task = queue.take()) != END) { | |
try { | |
// SOLUTION: .matchQuery() is accessible on WrappedCandidateMatcher | |
matcher.matchQuery(task.id, task.matchQuery, task.metadata); | |
} catch (IOException e) { | |
// SOLUTION: .matchQuery() is accessible on WrappedCandidateMatcher | |
matcher.reportError(task.id, e); | |
} | |
} | |
} catch (InterruptedException e) { | |
throw new RuntimeException("Interrupted during match", e); | |
} | |
return matcher; | |
} | |
} | |
private static class MatcherTask { | |
final String id; | |
final Query matchQuery; | |
final Map<String, String> metadata; | |
private MatcherTask(String id, Query matchQuery, Map<String, String> metadata) { | |
this.id = id; | |
this.matchQuery = matchQuery; | |
this.metadata = metadata; | |
} | |
} | |
/* Marker object placed on the queue after all matches are done, to indicate to the | |
worker threads that they should finish */ | |
private static final MatcherTask END = new MatcherTask("", null, Collections.emptyMap()); | |
private static class WrappedParallelMatcherFactory<T extends QueryMatch> | |
implements MatcherFactory<T> { | |
private final ExecutorService executor; | |
private final WrappedMatcherFactory<T> matcherFactory; | |
private final int threads; | |
WrappedParallelMatcherFactory( | |
ExecutorService executor, WrappedMatcherFactory<T> matcherFactory, int threads) { | |
this.executor = executor; | |
this.matcherFactory = matcherFactory; | |
this.threads = threads; | |
} | |
@Override | |
public ParallelMatcherUsingWrapped<T> createMatcher(IndexSearcher searcher) { | |
return new ParallelMatcherUsingWrapped<>(searcher, executor, matcherFactory, threads); | |
} | |
} | |
/** | |
* Create a new MatcherFactory for a ParallelMatcher | |
* | |
* @param executor the ExecutorService to use | |
* @param matcherFactory the MatcherFactory to use to create submatchers | |
* @param threads the number of threads to use | |
* @param <T> the type of QueryMatch generated | |
*/ | |
public static <T extends QueryMatch> MatcherFactory<T> factory( | |
ExecutorService executor, WrappedMatcherFactory<T> matcherFactory, int threads) { | |
return new WrappedParallelMatcherFactory<>(executor, matcherFactory, threads); | |
} | |
/** | |
* Create a new MatcherFactory for a ParallelMatcher | |
* | |
* <p>This factory will create a ParallelMatcher that uses as many threads as there are cores | |
* available to the JVM (as determined by {@code Runtime.getRuntime().availableProcessors()}). | |
* | |
* @param executor the ExecutorService to use | |
* @param matcherFactory the MatcherFactory to use to create submatchers | |
* @param <T> the type of QueryMatch generated | |
*/ | |
public static <T extends QueryMatch> MatcherFactory<T> factory( | |
ExecutorService executor, WrappedMatcherFactory<T> matcherFactory) { | |
int threads = Runtime.getRuntime().availableProcessors(); | |
return new WrappedParallelMatcherFactory<>(executor, matcherFactory, threads); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment