Skip to content

Instantly share code, notes, and snippets.

Created November 1, 2011 05:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/1330001 to your computer and use it in GitHub Desktop.
Save anonymous/1330001 to your computer and use it in GitHub Desktop.
This class will consume the queue messages and create index using bulk api
package com.arosys.doqeap.esearch;
import com.arosys.doqeap.exception.KeyNotFoundException;
import com.arosys.doqeap.exception.ValueNotFoundException;
import com.arosys.doqeap.serviceconfiguration.ServiceConfiguration;
import com.arosys.logger.LoggerFactory;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.*;
import org.apache.log4j.Logger;
import static org.elasticsearch.client.Requests.*;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.common.collect.Lists;
import org.json.JSONException;
import org.json.JSONObject;
public class IndexCreator {
private Logger logger;
private ServiceConfiguration serviceconfig;
private String exchangeName;
private String queueName;
private String routingKey;
public IndexCreator()
{
logger=LoggerFactory.getLogger("IndexCreator","resources/log4j.xml");
} // IndexCreator() method
public boolean prepareIndex(ServiceConfiguration serviceconfig,Channel channel,Client client,String indexName,String indexType) throws IOException, JSONException
{
boolean runInfinite = true;
boolean prepareindexstatus=false;
String s = null;
JSONObject jsonObject = null;
boolean durable = true;
boolean noAck = false;
QueueingConsumer.Delivery task = null;
int counter = 0;
logger.info(">>>>>>>prepare index called>>>>>");
try {
exchangeName =serviceconfig.getValue("elaticsearch.exchangename");
logger.info("prepareIndex() Elasic exchangeName>>>"+exchangeName);
queueName =serviceconfig.getValue("elaticsearch.queuename");
logger.info("prepareIndex() Elasic queueName>>>"+queueName);
routingKey =serviceconfig.getValue("elaticsearch.routekeyname");
logger.info("prepareIndex() Elasic routingKey>>>"+routingKey);
} catch (ValueNotFoundException ex) {
logger.error(ex.getMessage());
} catch (KeyNotFoundException ex) {
logger.error(ex.getMessage());
}
channel.exchangeDeclare(exchangeName, "direct", durable);
channel.queueDeclare(queueName, durable,false,false,null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, noAck, consumer);
logger.info("MQ setting completed");
while (runInfinite)
{
try{
task = null;
task= consumer.nextDelivery(2000);
counter++;
if(task==null)
{
break;
}
s = new String(task.getBody());
jsonObject = new JSONObject(s);
logger.info("message is successfully receiveed by consumer: "+task.getBody().toString());
logger.debug("Counter: "+counter);
if (task != null && task.getBody() != null)
{
final List<Long> deliveryTags = Lists.newArrayList();
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
try
{
IndexRequest ir=indexRequest().index(indexName).type(indexType).source(task.getBody()).create(true);
bulkRequestBuilder.add(ir);
BulkResponse response = bulkRequestBuilder.execute().actionGet();
prepareindexstatus=true;
}
catch (Exception e)
{
logger.error("elastic server not started-"+e.getMessage());
logger.warn("failed to parse request for delivery tag [{}], ack'ing..."+ task.getEnvelope().getDeliveryTag());
try {
channel.basicAck(task.getEnvelope().getDeliveryTag(), false);
}catch (Exception e1){
logger.warn("failed to ack [{}]"+ task.getEnvelope().getDeliveryTag());
}
continue;
}
deliveryTags.add(task.getEnvelope().getDeliveryTag());
}//end if
channel.basicAck(task.getEnvelope().getDeliveryTag(), false);
} catch (Exception ie) {
continue;
}
}//while
logger.info("exiting from while loop");
logger.info("channel>>>"+channel);
return prepareindexstatus;
}//method
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment