Created
November 1, 2011 05:36
-
-
Save anonymous/1330001 to your computer and use it in GitHub Desktop.
This class will consume the queue messages and create index using bulk api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.arosys.doqeap.esearch; | |
import com.arosys.doqeap.exception.KeyNotFoundException; | |
import com.arosys.doqeap.exception.ValueNotFoundException; | |
import com.arosys.doqeap.serviceconfiguration.ServiceConfiguration; | |
import com.arosys.logger.LoggerFactory; | |
import com.rabbitmq.client.*; | |
import java.io.IOException; | |
import java.util.*; | |
import org.apache.log4j.Logger; | |
import static org.elasticsearch.client.Requests.*; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.action.index.IndexRequest; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.common.collect.Lists; | |
import org.json.JSONException; | |
import org.json.JSONObject; | |
public class IndexCreator { | |
private Logger logger; | |
private ServiceConfiguration serviceconfig; | |
private String exchangeName; | |
private String queueName; | |
private String routingKey; | |
public IndexCreator() | |
{ | |
logger=LoggerFactory.getLogger("IndexCreator","resources/log4j.xml"); | |
} // IndexCreator() method | |
public boolean prepareIndex(ServiceConfiguration serviceconfig,Channel channel,Client client,String indexName,String indexType) throws IOException, JSONException | |
{ | |
boolean runInfinite = true; | |
boolean prepareindexstatus=false; | |
String s = null; | |
JSONObject jsonObject = null; | |
boolean durable = true; | |
boolean noAck = false; | |
QueueingConsumer.Delivery task = null; | |
int counter = 0; | |
logger.info(">>>>>>>prepare index called>>>>>"); | |
try { | |
exchangeName =serviceconfig.getValue("elaticsearch.exchangename"); | |
logger.info("prepareIndex() Elasic exchangeName>>>"+exchangeName); | |
queueName =serviceconfig.getValue("elaticsearch.queuename"); | |
logger.info("prepareIndex() Elasic queueName>>>"+queueName); | |
routingKey =serviceconfig.getValue("elaticsearch.routekeyname"); | |
logger.info("prepareIndex() Elasic routingKey>>>"+routingKey); | |
} catch (ValueNotFoundException ex) { | |
logger.error(ex.getMessage()); | |
} catch (KeyNotFoundException ex) { | |
logger.error(ex.getMessage()); | |
} | |
channel.exchangeDeclare(exchangeName, "direct", durable); | |
channel.queueDeclare(queueName, durable,false,false,null); | |
channel.queueBind(queueName, exchangeName, routingKey); | |
QueueingConsumer consumer = new QueueingConsumer(channel); | |
channel.basicConsume(queueName, noAck, consumer); | |
logger.info("MQ setting completed"); | |
while (runInfinite) | |
{ | |
try{ | |
task = null; | |
task= consumer.nextDelivery(2000); | |
counter++; | |
if(task==null) | |
{ | |
break; | |
} | |
s = new String(task.getBody()); | |
jsonObject = new JSONObject(s); | |
logger.info("message is successfully receiveed by consumer: "+task.getBody().toString()); | |
logger.debug("Counter: "+counter); | |
if (task != null && task.getBody() != null) | |
{ | |
final List<Long> deliveryTags = Lists.newArrayList(); | |
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); | |
try | |
{ | |
IndexRequest ir=indexRequest().index(indexName).type(indexType).source(task.getBody()).create(true); | |
bulkRequestBuilder.add(ir); | |
BulkResponse response = bulkRequestBuilder.execute().actionGet(); | |
prepareindexstatus=true; | |
} | |
catch (Exception e) | |
{ | |
logger.error("elastic server not started-"+e.getMessage()); | |
logger.warn("failed to parse request for delivery tag [{}], ack'ing..."+ task.getEnvelope().getDeliveryTag()); | |
try { | |
channel.basicAck(task.getEnvelope().getDeliveryTag(), false); | |
}catch (Exception e1){ | |
logger.warn("failed to ack [{}]"+ task.getEnvelope().getDeliveryTag()); | |
} | |
continue; | |
} | |
deliveryTags.add(task.getEnvelope().getDeliveryTag()); | |
}//end if | |
channel.basicAck(task.getEnvelope().getDeliveryTag(), false); | |
} catch (Exception ie) { | |
continue; | |
} | |
}//while | |
logger.info("exiting from while loop"); | |
logger.info("channel>>>"+channel); | |
return prepareindexstatus; | |
}//method | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment