Created
March 16, 2012 16:22
-
-
Save focampo/2050841 to your computer and use it in GitHub Desktop.
ES Clients Factory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ElasticSearchItemIndexerFactory { | |
private final static Logger logger = Logger.getLogger(ElasticSearchItemIndexerFactory.class) | |
public final static String NODE_TYPE = 'node' | |
public final static String TRANSPORT_TYPE = 'transport' | |
/** | |
* Crea un indexador de items para ElasticSearch | |
* @param indexerConfig mapa de configuracion generica del indexador | |
* @param esConfig mapa de configuracion especifica de ElasticSearch | |
* @return el indexador | |
*/ | |
public static ElasticSearchItemIndexer createIndexer(indexerConfig, esConfig, reindexScheduler) { | |
logger.info("Creating ElasticSearchItemIndexer of type [$esConfig.clientType]") | |
switch(esConfig.clientType) { | |
case NODE_TYPE: | |
Node node = this.createNodeClient(esConfig.clusterName, esConfig.node.local) | |
return new ElasticSearchItemIndexer(node, indexerConfig, reindexScheduler) | |
break | |
case TRANSPORT_TYPE: | |
def client = createTransportClient(esConfig.clusterName, esConfig.transport.port, | |
esConfig.transport.shards, esConfig.transport.sniffCluster) | |
return new ElasticSearchItemIndexer(client, indexerConfig, reindexScheduler) | |
break | |
} | |
} | |
/** | |
* Crea una instancia de TransportClient | |
* Este cliente realiza request en modo round-robin entre todos los shards definidos. | |
* | |
* @param clusterName | |
* @param port puerto de conexion como cliente | |
* @param hosts servidores ES dentro del cluster. Pueden ser solo los masters | |
* @param sniffCluster si es 'true', se sniffea la red para detectar servidores agregados al cluster | |
* @return un cliente de tipo TransportClient | |
*/ | |
protected static synchronized Client createTransportClient(clusterName, port, hosts, sniffCluster) { | |
if (logger.isDebugEnabled()) logger.debug("Creating ElasticSearch Transport Client for cluster [$clusterName]") | |
Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", clusterName) | |
.put("node.client", true) | |
.put("client.transport.sniff", sniffCluster) | |
.build() | |
Client client = new TransportClient(settings) | |
hosts.each { host -> | |
client.addTransportAddress(new InetSocketTransportAddress(host, port)) | |
} | |
return client | |
} | |
/** | |
* Genera un cliente de ES de tipo Node. Este cliente pasa a formar parte del cluster | |
* pero sin almacenar informacion. De esta manera sabe a que servers redirigir la informacion | |
* @param clusterName | |
* @param local | |
* @return una instancia de NodeClient | |
*/ | |
protected static synchronized Node createNodeClient(clusterName, local) { | |
if (logger.isDebugEnabled()) logger.debug("Creating ElasticSearch Node Client for cluster [$clusterName]") | |
org.elasticsearch.node.NodeBuilder builder = org.elasticsearch.node.NodeBuilder.nodeBuilder() | |
.clusterName(clusterName) | |
.local(local) | |
.client(true) | |
builder.build() | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ElasticSearchItemIndexerFactoryTest { | |
final static String CLUSTER_1 = 'sergiodenis' | |
final static String CLUSTER_2 = 'macgyver' | |
final static String NODE_TYPE = 'node' | |
final static String TRANSPORT_TYPE = 'transport' | |
final static int TRANSPORT_PORT = 9300 //client port of ES | |
def shardsMock = ['localhost'] | |
ReindexScheduler reindexSchedulerMock | |
def config | |
ElasticSearchItemIndexer indexer | |
/** | |
* IMPORTANTE: SIEMPRE CONFIGURAR EL CLIENTE COMO 'LOCAL:TRUE' PARA TESTING | |
*/ | |
@Before | |
public void setUp() throws Exception { | |
config = [ | |
item: [attributes: ['id', 'title']], | |
restclient : [name : 'rest-cli'], | |
indexer : [ | |
type: ItemIndexerFactory.ES_INDEXER, | |
recentlyClosedDays:5, lowQuantityMinimum: 1, lowQuantityAmount: 3, | |
remove : [months: 2] | |
], | |
es: [ | |
clusterName : CLUSTER_1, | |
clientType: NODE_TYPE, | |
node: [local: true], | |
transport: [port: TRANSPORT_PORT, shards:shardsMock, sniffCluster:true] | |
] | |
] | |
reindexSchedulerMock = null | |
} | |
/** | |
* @throws java.lang.Exception | |
*/ | |
@After | |
public void tearDown() throws Exception { | |
config = null | |
indexer?.shutdown() | |
} | |
@Test | |
public void testCreateIndexer_Node() { | |
config.es.clientType = NODE_TYPE | |
indexer = ElasticSearchItemIndexerFactory.createIndexer(config.indexer as ConfigObject, | |
config.es as ConfigObject, | |
reindexSchedulerMock) | |
Assert.assertTrue(indexer instanceof ElasticSearchItemIndexer) | |
Assert.assertTrue(indexer.getClient() instanceof NodeClient) | |
testConfigAttributes(indexer) | |
def settings = indexer.getNode().settings() | |
Assert.assertEquals(CLUSTER_1, settings.get('cluster.name')) | |
Assert.assertTrue(Boolean.valueOf(settings.get('node.client'))) | |
} | |
@Test | |
public void testCreateIndexer_Transport() { | |
config.es.clientType = TRANSPORT_TYPE | |
config.es.clusterName = CLUSTER_2 //evito que se forme un cluster con otro nodo posiblemente abierto | |
indexer = ElasticSearchItemIndexerFactory.createIndexer(config.indexer as ConfigObject, | |
config.es as ConfigObject, | |
reindexSchedulerMock) | |
Assert.assertTrue(indexer instanceof ElasticSearchItemIndexer) | |
testConfigAttributes(indexer) | |
def client = indexer.getClient() | |
Assert.assertTrue(indexer.getClient() instanceof TransportClient) | |
//Verifica que todos los shards hayan sido incluidos como posibles hosts | |
client.transportAddresses().collect { it.address() }.each {address -> | |
Assert.assertEquals(TRANSPORT_PORT, address.getPort()) | |
def shard = shardsMock.find { it == address.getHostName() } | |
Assert.assertNotNull(shard) | |
shardsMock.remove(shard) | |
} | |
} | |
private void testConfigAttributes(indexer) { | |
Assert.assertEquals(config.indexer.recentlyClosedDays, indexer.recentlyClosedDays) | |
Assert.assertEquals(config.indexer.lowQuantityAmount, indexer.lowQuantityAmount) | |
Assert.assertEquals(config.indexer.lowQuantityMinimum, indexer.lowQuantityMinimum) | |
Assert.assertEquals(config.indexer.remove.months, indexer.removeMonths) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment