Skip to content

Instantly share code, notes, and snippets.

@fakeh
Created April 21, 2017 10:56
Show Gist options
  • Save fakeh/2a4368dd8f0e17c3b20a747e413dc193 to your computer and use it in GitHub Desktop.
Save fakeh/2a4368dd8f0e17c3b20a747e413dc193 to your computer and use it in GitHub Desktop.
Keeping up to date with DNS changes to a Elasticsearch host.
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
public class ElasticDnsTrackerClient extends FilterClient {
private static final Logger LOG = Logger.getLogger(ElasticDnsTrackerClient.class.getName());
private ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, ElasticDnsTrackerClient.class.getSimpleName()));
private List<InetSocketAddress> baseAddresses;
public ElasticDnsTrackerClient(TransportClient client) {
super(client);
this.baseAddresses = client.transportAddresses().parallelStream()
.filter(InetSocketTransportAddress.class::isInstance)
.map(InetSocketTransportAddress.class::cast)
.map(InetSocketTransportAddress::address)
.collect(Collectors.toList());
executor.scheduleAtFixedRate(() -> {
LOG.fine("Reresolving IP addresses regularly.");
reresolve();
}, 5, 5, TimeUnit.MINUTES);
}
public List<TransportAddress> reresolve() { //Public so it can be called from tooling
LOG.fine("Reresolving IP addresses for "+ baseAddresses.stream()
.map(InetSocketAddress::getHostString).collect(Collectors.toList()));
List<TransportAddress> previousAddresses = in().transportAddresses();
for(InetSocketAddress baseAddress : this.baseAddresses){
try {
for (InetAddress inetAddress : InetAddress.getAllByName(baseAddress.getHostName())) {
in().addTransportAddress(new InetSocketTransportAddress(inetAddress, baseAddress.getPort()));
}
} catch (UnknownHostException e) {
LOG.log(Level.SEVERE, "Couldn't get any addresses for hostname "+ baseAddress.getHostName(), e);
}
}
List<TransportAddress> transportAddresses = in().transportAddresses();
if( ! previousAddresses.equals(transportAddresses)){
LOG.info("Changed transport addresses. Was "+ previousAddresses +", now "+ transportAddresses);
}
return transportAddresses;
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
ActionListener<Response> retryListener = ActionListener.<Response>wrap(listener::onResponse, f -> {
if( ! (f instanceof NoNodeAvailableException)) {
listener.onFailure(f); //not a failure we can cope with, just propagate it.
return;
}
try{
retry(action, request, listener, (NoNodeAvailableException) f);
}catch(Exception e){
listener.onFailure(e); //Because subsequent Exceptions are thrown immediately and not processed by the listener, these are caught and propagated properly...
}
});
try{
super.doExecute(action, request, retryListener);
}catch(NoNodeAvailableException e){ //The first time the node cannot be reached, the exception is passed to the listener, subsequent calls without a new node becoming available throw exceptions immediately.
retry(action, request, listener, e); //Don't send exceptions to the listener, because we're not in a listenning context, presumably.
}
}
private <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void retry(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener, NoNodeAvailableException e) {
LOG.log(Level.FINE, "Reresolving IP addresses because of a NoNodeAvailableException", e);
reresolve();
super.doExecute(action, request, listener); //retry
}
@Override protected TransportClient in() {
return (TransportClient) super.in();
}
@Override public void close() {
executor.shutdown();
super.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment