Skip to content

Instantly share code, notes, and snippets.

@focampo
Created March 16, 2012 16:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save focampo/2050841 to your computer and use it in GitHub Desktop.
Save focampo/2050841 to your computer and use it in GitHub Desktop.
ES Clients Factory
class ElasticSearchItemIndexerFactory {
private final static Logger logger = Logger.getLogger(ElasticSearchItemIndexerFactory.class)
public final static String NODE_TYPE = 'node'
public final static String TRANSPORT_TYPE = 'transport'
/**
* Crea un indexador de items para ElasticSearch
* @param indexerConfig mapa de configuracion generica del indexador
* @param esConfig mapa de configuracion especifica de ElasticSearch
* @return el indexador
*/
public static ElasticSearchItemIndexer createIndexer(indexerConfig, esConfig, reindexScheduler) {
logger.info("Creating ElasticSearchItemIndexer of type [$esConfig.clientType]")
switch(esConfig.clientType) {
case NODE_TYPE:
Node node = this.createNodeClient(esConfig.clusterName, esConfig.node.local)
return new ElasticSearchItemIndexer(node, indexerConfig, reindexScheduler)
break
case TRANSPORT_TYPE:
def client = createTransportClient(esConfig.clusterName, esConfig.transport.port,
esConfig.transport.shards, esConfig.transport.sniffCluster)
return new ElasticSearchItemIndexer(client, indexerConfig, reindexScheduler)
break
}
}
/**
* Crea una instancia de TransportClient
* Este cliente realiza request en modo round-robin entre todos los shards definidos.
*
* @param clusterName
* @param port puerto de conexion como cliente
* @param hosts servidores ES dentro del cluster. Pueden ser solo los masters
* @param sniffCluster si es 'true', se sniffea la red para detectar servidores agregados al cluster
* @return un cliente de tipo TransportClient
*/
protected static synchronized Client createTransportClient(clusterName, port, hosts, sniffCluster) {
if (logger.isDebugEnabled()) logger.debug("Creating ElasticSearch Transport Client for cluster [$clusterName]")
Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", clusterName)
.put("node.client", true)
.put("client.transport.sniff", sniffCluster)
.build()
Client client = new TransportClient(settings)
hosts.each { host ->
client.addTransportAddress(new InetSocketTransportAddress(host, port))
}
return client
}
/**
* Genera un cliente de ES de tipo Node. Este cliente pasa a formar parte del cluster
* pero sin almacenar informacion. De esta manera sabe a que servers redirigir la informacion
* @param clusterName
* @param local
* @return una instancia de NodeClient
*/
protected static synchronized Node createNodeClient(clusterName, local) {
if (logger.isDebugEnabled()) logger.debug("Creating ElasticSearch Node Client for cluster [$clusterName]")
org.elasticsearch.node.NodeBuilder builder = org.elasticsearch.node.NodeBuilder.nodeBuilder()
.clusterName(clusterName)
.local(local)
.client(true)
builder.build()
}
}
class ElasticSearchItemIndexerFactoryTest {
final static String CLUSTER_1 = 'sergiodenis'
final static String CLUSTER_2 = 'macgyver'
final static String NODE_TYPE = 'node'
final static String TRANSPORT_TYPE = 'transport'
final static int TRANSPORT_PORT = 9300 //client port of ES
def shardsMock = ['localhost']
ReindexScheduler reindexSchedulerMock
def config
ElasticSearchItemIndexer indexer
/**
* IMPORTANTE: SIEMPRE CONFIGURAR EL CLIENTE COMO 'LOCAL:TRUE' PARA TESTING
*/
@Before
public void setUp() throws Exception {
config = [
item: [attributes: ['id', 'title']],
restclient : [name : 'rest-cli'],
indexer : [
type: ItemIndexerFactory.ES_INDEXER,
recentlyClosedDays:5, lowQuantityMinimum: 1, lowQuantityAmount: 3,
remove : [months: 2]
],
es: [
clusterName : CLUSTER_1,
clientType: NODE_TYPE,
node: [local: true],
transport: [port: TRANSPORT_PORT, shards:shardsMock, sniffCluster:true]
]
]
reindexSchedulerMock = null
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
config = null
indexer?.shutdown()
}
@Test
public void testCreateIndexer_Node() {
config.es.clientType = NODE_TYPE
indexer = ElasticSearchItemIndexerFactory.createIndexer(config.indexer as ConfigObject,
config.es as ConfigObject,
reindexSchedulerMock)
Assert.assertTrue(indexer instanceof ElasticSearchItemIndexer)
Assert.assertTrue(indexer.getClient() instanceof NodeClient)
testConfigAttributes(indexer)
def settings = indexer.getNode().settings()
Assert.assertEquals(CLUSTER_1, settings.get('cluster.name'))
Assert.assertTrue(Boolean.valueOf(settings.get('node.client')))
}
@Test
public void testCreateIndexer_Transport() {
config.es.clientType = TRANSPORT_TYPE
config.es.clusterName = CLUSTER_2 //evito que se forme un cluster con otro nodo posiblemente abierto
indexer = ElasticSearchItemIndexerFactory.createIndexer(config.indexer as ConfigObject,
config.es as ConfigObject,
reindexSchedulerMock)
Assert.assertTrue(indexer instanceof ElasticSearchItemIndexer)
testConfigAttributes(indexer)
def client = indexer.getClient()
Assert.assertTrue(indexer.getClient() instanceof TransportClient)
//Verifica que todos los shards hayan sido incluidos como posibles hosts
client.transportAddresses().collect { it.address() }.each {address ->
Assert.assertEquals(TRANSPORT_PORT, address.getPort())
def shard = shardsMock.find { it == address.getHostName() }
Assert.assertNotNull(shard)
shardsMock.remove(shard)
}
}
private void testConfigAttributes(indexer) {
Assert.assertEquals(config.indexer.recentlyClosedDays, indexer.recentlyClosedDays)
Assert.assertEquals(config.indexer.lowQuantityAmount, indexer.lowQuantityAmount)
Assert.assertEquals(config.indexer.lowQuantityMinimum, indexer.lowQuantityMinimum)
Assert.assertEquals(config.indexer.remove.months, indexer.removeMonths)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment