Skip to content

Instantly share code, notes, and snippets.

@dakrone

dakrone/foo.diff Secret

Created August 21, 2017 20:43
Show Gist options
  • Save dakrone/e51881e25aaa2a9bd548465d08fe9162 to your computer and use it in GitHub Desktop.
Save dakrone/e51881e25aaa2a9bd548465d08fe9162 to your computer and use it in GitHub Desktop.
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/acti
on/search/SearchTransportService.java
index 8ac8593a51d..a4de78947e4 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
@@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsSearchResult;
@@ -50,6 +51,7 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
+import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
@@ -57,6 +59,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
@@ -81,6 +84,7 @@ public class SearchTransportService extends AbstractComponent {
private final TransportService transportService;
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
+ private final Map<String, Long> clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
public SearchTransportService(Settings settings, TransportService transportService,
BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper) {
@@ -132,7 +136,7 @@ public class SearchTransportService extends AbstractComponent {
public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final SearchActionListener<DfsSearchResult> listener) {
transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task,
- new ActionListenerResponseHandler<>(listener, DfsSearchResult::new));
+ new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId()));
}
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
@@ -144,25 +148,26 @@ public class SearchTransportService extends AbstractComponent {
final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
- new ActionListenerResponseHandler<>(handler, supplier));
+ new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId()));
}
public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
final SearchActionListener<QuerySearchResult> listener) {
transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task,
- new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
+ new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId()));
}
public void sendExecuteScrollQuery(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task,
final SearchActionListener<ScrollQuerySearchResult> listener) {
transportService.sendChildRequest(connection, QUERY_SCROLL_ACTION_NAME, request, task,
- new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
+ new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId(
)));
}
public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task,
final SearchActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
- new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new));
+ new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new,
+ clientConnections, connection.getNode().getId()));
}
public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,
@@ -178,16 +183,17 @@ public class SearchTransportService extends AbstractComponent {
private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task,
final SearchActionListener<FetchSearchResult> listener) {
transportService.sendChildRequest(connection, action, request, task,
- new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
+ new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId()));
}
/**
* Used by {@link TransportSearchAction} to send the expand queries (field collapsing).
*/
void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task,
- final ActionListener<MultiSearchResponse> listener) {
- transportService.sendChildRequest(transportService.getConnection(transportService.getLocalNode()), MultiSearchAction.NAME, req
uest,
- task, new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new));
+ final ActionListener<MultiSearchResponse> listener) {
+ final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode());
+ transportService.sendChildRequest(connection, MultiSearchAction.NAME, request, task,
+ new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId()));
}
public RemoteClusterService getRemoteClusterService() {
@@ -198,7 +204,7 @@ public class SearchTransportService extends AbstractComponent {
* Return a map of nodeId to pending number of requests for the given action name prefix
*/
public Map<String, Long> getPendingRequests(final String actionNamePrefix) {
- return transportService.getPendingRequests(actionNamePrefix);
+ return Collections.unmodifiableMap(clientConnections);
}
static class ScrollFreeContextRequest extends TransportRequest {
@@ -486,4 +492,32 @@ public class SearchTransportService extends AbstractComponent {
return transportService.getRemoteClusterService().getConnection(node, clusterAlias);
}
}
+
+ final class ConnectionCountingHandler<Response extends TransportResponse> extends ActionListenerResponseHandler<Response> {
+ private final Map<String, Long> clientConnections;
+ private final String nodeId;
+
+ ConnectionCountingHandler(final ActionListener<? super Response> listener, final Supplier<Response> responseSupplier,
+ final Map<String, Long> clientConnections, final String nodeId) {
+ super(listener, responseSupplier);
+ this.clientConnections = clientConnections;
+ this.nodeId = nodeId;
+ // Increment the number of connections for this node by one
+ clientConnections.compute(nodeId, (id, conns) -> conns == null ? 0 : conns + 1);
+ }
+
+ @Override
+ public void handleResponse(Response response) {
+ super.handleResponse(response);
+ // Decrement the number of connections or remove it entirely if there are no more connections
+ clientConnections.computeIfPresent(nodeId, (id, conns) -> conns == 0 ? null : conns - 1);
+ }
+
+ @Override
+ public void handleException(TransportException e) {
+ super.handleException(e);
+ // Decrement the number of connections or remove it entirely if there are no more connections
+ clientConnections.computeIfPresent(nodeId, (id, conns) -> conns == 0 ? null : conns - 1);
+ }
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/Tran
sportService.java
index 325b67fb467..4bf06374004 100644
--- a/core/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -561,21 +561,6 @@ public class TransportService extends AbstractLifecycleComponent {
}
- /**
- * Return a map of nodeId to pending number of requests for the given action name prefix
- */
- public Map<String, Long> getPendingRequests(final String actionNamePrefix) {
- Map<String, Long> nodeCounts = new HashMap<>();
- for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
- RequestHolder reqHolder = entry.getValue();
- if (reqHolder.action().startsWith(actionNamePrefix)) {
- String nodeId = reqHolder.connection().getNode().getId();
- nodeCounts.put(nodeId, nodeCounts.getOrDefault(nodeId, 0L) + 1);
- }
- }
- return nodeCounts;
- }
-
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment