Created
April 21, 2017 10:56
-
-
Save fakeh/2a4368dd8f0e17c3b20a747e413dc193 to your computer and use it in GitHub Desktop.
Keeping up to date with DNS changes to a Elasticsearch host.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.InetAddress; | |
import java.net.InetSocketAddress; | |
import java.net.UnknownHostException; | |
import java.util.List; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
import java.util.stream.Collectors; | |
import org.elasticsearch.action.Action; | |
import org.elasticsearch.action.ActionListener; | |
import org.elasticsearch.action.ActionRequest; | |
import org.elasticsearch.action.ActionRequestBuilder; | |
import org.elasticsearch.action.ActionResponse; | |
import org.elasticsearch.client.FilterClient; | |
import org.elasticsearch.client.transport.NoNodeAvailableException; | |
import org.elasticsearch.client.transport.TransportClient; | |
import org.elasticsearch.common.transport.InetSocketTransportAddress; | |
import org.elasticsearch.common.transport.TransportAddress; | |
public class ElasticDnsTrackerClient extends FilterClient { | |
private static final Logger LOG = Logger.getLogger(ElasticDnsTrackerClient.class.getName()); | |
private ScheduledExecutorService executor = | |
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, ElasticDnsTrackerClient.class.getSimpleName())); | |
private List<InetSocketAddress> baseAddresses; | |
public ElasticDnsTrackerClient(TransportClient client) { | |
super(client); | |
this.baseAddresses = client.transportAddresses().parallelStream() | |
.filter(InetSocketTransportAddress.class::isInstance) | |
.map(InetSocketTransportAddress.class::cast) | |
.map(InetSocketTransportAddress::address) | |
.collect(Collectors.toList()); | |
executor.scheduleAtFixedRate(() -> { | |
LOG.fine("Reresolving IP addresses regularly."); | |
reresolve(); | |
}, 5, 5, TimeUnit.MINUTES); | |
} | |
public List<TransportAddress> reresolve() { //Public so it can be called from tooling | |
LOG.fine("Reresolving IP addresses for "+ baseAddresses.stream() | |
.map(InetSocketAddress::getHostString).collect(Collectors.toList())); | |
List<TransportAddress> previousAddresses = in().transportAddresses(); | |
for(InetSocketAddress baseAddress : this.baseAddresses){ | |
try { | |
for (InetAddress inetAddress : InetAddress.getAllByName(baseAddress.getHostName())) { | |
in().addTransportAddress(new InetSocketTransportAddress(inetAddress, baseAddress.getPort())); | |
} | |
} catch (UnknownHostException e) { | |
LOG.log(Level.SEVERE, "Couldn't get any addresses for hostname "+ baseAddress.getHostName(), e); | |
} | |
} | |
List<TransportAddress> transportAddresses = in().transportAddresses(); | |
if( ! previousAddresses.equals(transportAddresses)){ | |
LOG.info("Changed transport addresses. Was "+ previousAddresses +", now "+ transportAddresses); | |
} | |
return transportAddresses; | |
} | |
@Override | |
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute( | |
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) { | |
ActionListener<Response> retryListener = ActionListener.<Response>wrap(listener::onResponse, f -> { | |
if( ! (f instanceof NoNodeAvailableException)) { | |
listener.onFailure(f); //not a failure we can cope with, just propagate it. | |
return; | |
} | |
try{ | |
retry(action, request, listener, (NoNodeAvailableException) f); | |
}catch(Exception e){ | |
listener.onFailure(e); //Because subsequent Exceptions are thrown immediately and not processed by the listener, these are caught and propagated properly... | |
} | |
}); | |
try{ | |
super.doExecute(action, request, retryListener); | |
}catch(NoNodeAvailableException e){ //The first time the node cannot be reached, the exception is passed to the listener, subsequent calls without a new node becoming available throw exceptions immediately. | |
retry(action, request, listener, e); //Don't send exceptions to the listener, because we're not in a listenning context, presumably. | |
} | |
} | |
private <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void retry( | |
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener, NoNodeAvailableException e) { | |
LOG.log(Level.FINE, "Reresolving IP addresses because of a NoNodeAvailableException", e); | |
reresolve(); | |
super.doExecute(action, request, listener); //retry | |
} | |
@Override protected TransportClient in() { | |
return (TransportClient) super.in(); | |
} | |
@Override public void close() { | |
executor.shutdown(); | |
super.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment