Skip to content

Instantly share code, notes, and snippets.

@vaughnd
Created December 23, 2015 14:04
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vaughnd/22fe0670a296ebcb436c to your computer and use it in GitHub Desktop.
Save vaughnd/22fe0670a296ebcb436c to your computer and use it in GitHub Desktop.
package com.vaughndickson.elasticsearch
import groovy.util.logging.Slf4j
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus
import org.elasticsearch.client.Client
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.springframework.beans.factory.DisposableBean
import org.springframework.beans.factory.FactoryBean
import org.springframework.beans.factory.InitializingBean
import sun.net.spi.nameservice.dns.DNSNameService
@Slf4j
public class ElasticsearchClient implements FactoryBean<Client>, InitializingBean, DisposableBean {
public enum ELASTIC_SEARCH_TYPE {normal,found}
// ES configuration properties
String elasticsearchType // normal or found
String elasticsearchClientName // identifiable name, e.g. contentStoreEsClient for debugging
// local: localhost:9300
// found: xyz.us-east-1.aws.found.io:9343
String elasticsearchTransportUri
// local: cluster
// found: blank (calculated from TransportUri hash above, e.g. xyz)
String elasticsearchClustername
// local: blank
// found: shield configured username
String elasticsearchUsername
// local: blank
// found: shield configured password
String elasticsearchPassword
// instance variables
protected TransportClient client
protected int defaultPort
protected Timer dnsRefresherTimer
protected final static long DNS_REFRESH_TIME = 60 * 1000L // refresh DNS every minute, as the Found ELB timeout is 5min
/**
* Build shield client over SSL and authentication.
* @return Client client
*/
protected TransportClient buildShieldClient(int defaultPort) {
String host = elasticsearchTransportUri.substring(0, elasticsearchTransportUri.indexOf(":"))
String foundClusterId = host.split("\\.", 2)[0]
// Build the settings for our client.
Settings settings = ImmutableSettings.settingsBuilder()
.put("transport.ping_schedule", "5s")
.put("cluster.name", foundClusterId)
.put("action.bulk.compress", false)
.put("shield.transport.ssl", "true")
.put("request.headers.X-Found-Cluster", foundClusterId)
.put("shield.user", elasticsearchUsername + ":" + elasticsearchPassword)
.build()
// Instantiate a TransportClient and add the cluster to the list of addresses to connect to.
// Only port 9343 (SSL-encrypted) is currently supported.
Client client = new TransportClient(settings)
.addTransportAddress(toAddress(elasticsearchTransportUri, defaultPort))
ClusterHealthStatus status = client.admin().cluster().prepareHealth().get().getStatus()
log.info("Connected to Elasticsearch instance: " + elasticsearchTransportUri + ", status: " + status + ", client: " + elasticsearchClientName)
return client
}
/**
* Build normal elasticsearch transport client using only host and cluster name
* @return Client client
*/
protected TransportClient buildNormalClient(int defaultPort) {
// Build the settings for our client.
Settings settings = ImmutableSettings.settingsBuilder()
.put("transport.ping_schedule", "5s")
.put("cluster.name", elasticsearchClustername)
.put("action.bulk.compress", false)
.put("shield.enabled", false)
.build()
TransportClient client = new TransportClient(settings)
client.addTransportAddress(toAddress(elasticsearchTransportUri, defaultPort))
ClusterHealthStatus status = client.admin().cluster().prepareHealth().get().getStatus()
log.info("Connected to Elasticsearch instance: " + elasticsearchTransportUri + ", status: " + status + ", client: " + elasticsearchClientName)
return client
}
private def toAddress(String address, defaultPort) {
if (address == null) return null
String[] splitted = address.split(":")
int port = defaultPort
if (splitted.length > 1) {
port = Integer.parseInt(splitted[1])
}
return new InetSocketTransportAddress(splitted[0], port)
}
// Factory bean methods
void afterPropertiesSet() throws Exception {
log.info("Starting ElasticSearch client " + elasticsearchClientName)
if(elasticsearchTransportUri != null) {
if(elasticsearchType as ELASTIC_SEARCH_TYPE == ELASTIC_SEARCH_TYPE.normal) {
defaultPort = 9300
client = buildNormalClient(defaultPort)
} else {
defaultPort = 9343
client = buildShieldClient(defaultPort)
startDnsRefresherTask()
}
} else {
throw new IllegalArgumentException("No ES transport URI specified, please configure!")
}
}
Client getObject() throws Exception {
log.info("ELASTICSEARCH CLIENT: Getting " + elasticsearchClientName + " " + client + " of type " + elasticsearchType)
return client
}
Class<Client> getObjectType() {
return Client.class
}
boolean isSingleton() {
return true
}
void destroy() throws Exception {
try {
log.info("Closing ElasticSearch client " + elasticsearchClientName)
if (client != null) {
client.close()
}
} catch(Exception e) {
log.error("Error closing ElasticSearch client: ", e)
}
}
void startDnsRefresherTask() {
dnsRefresherTimer = new Timer()
dnsRefresherTimer.scheduleAtFixedRate(new DnsRefresherTask(), 0, DNS_REFRESH_TIME)
log.info("Started elasticsearch DNS refresh timer task")
}
/**
* Task to check for new IPs attached to our Found hostname every minute and add them to the transport client,
* to prevent NoNodeAvailableExceptions.
*
* We got this exception because we looked up a single IP once on boot, and used it for the lifetime of the service. While
* Found says they change the IPs over time, so old ones will eventually stop working without warning.
*
* The TransportClient checks for duplicates, so we just keep adding all IPs we find. A reachability check is done
* by the client too, so we don't check that ourselves.
*
* We use our own copy of DnsNameService, so that cached IPs don't get used. And we don't have to change the system-level
* caching which might break other services.
*
* Discussion about this: https://discuss.elastic.co/t/nonodeavailableexception-with-java-transport-client/37702
*/
class DnsRefresherTask extends TimerTask {
// we try to use our own name service first, so we can skip caching without breaking the rest of the system
// by setting the dns ttl too low
DNSNameService nameService = new DNSNameService()
@Override
void run() {
try {
String[] splitted = elasticsearchTransportUri.split(":")
String host = splitted[0]
int port = defaultPort
if (splitted.length > 1) {
port = Integer.parseInt(splitted[1])
}
Set<InetAddress> addresses = new HashSet<>()
try {
// don't try use DNS if we're looking for localhost or already have an IP
if(host != "localhost" && Character.digit(host.charAt(0), 16) == -1) {
addresses.addAll(nameService.lookupAllHostAddr(host))
}
} catch(Exception ex) {
log.error("Internal name service failed, trying system one", ex)
}
if(addresses.isEmpty()) {
addresses.addAll(InetAddress.getAllByName(host))
}
for(InetAddress addr : addresses) {
if(addr instanceof Inet4Address) {
log.debug("ES DNS REFRESH (" + client + "): adding " + addr)
client.addTransportAddress(new InetSocketTransportAddress(addr, port))
}
}
} catch(Exception ex) {
log.error("Failed to refresh elasticsearch DNS, will retry", ex)
}
}
}
}
@imod
Copy link

imod commented Dec 23, 2015

The DnsRefresherTask is a nice idea!
although DNSNameService nameService = new DNSNameService() seems to be an issue with Java8, I'm not able to compile it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment