Skip to content

Instantly share code, notes, and snippets.

@sameek
Created January 19, 2012 05:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sameek/1638251 to your computer and use it in GitHub Desktop.
Save sameek/1638251 to your computer and use it in GitHub Desktop.
RabbitMQ River With Custom Analyzer
{
"index" : {
"analysis" : {
"analyzer" : {
"customindexanalyzer" : {
"type":"custom",
"tokenizer" : "whitespace",
"filter" : ["lowercase","asciifolding","length","mystopword"],
"char_filter" :["html_strip"]
},
"customsearchanalyzer" : {
"type":"custom",
"tokenizer" : "whitespace",
"filter" : ["lowercase","asciifolding","length","mystopword"],
"char_filter" :["html_strip"]
}
},
"filter" : {
"mystopword": {
"type" : "stop",
"stopwords_path" :"F:/resources/stopwordeng.txt" ,
"ignore_case":true
}
}
}
}
}
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package com.arosys.doqeap.esearch;
import com.arosys.configurationreader.ConfigurationReader;
import com.arosys.customexception.CustomException;
import com.arosys.doqeap.exception.ValueNotFoundException;
import com.arosys.logger.LoggerFactory;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import org.apache.log4j.Logger;
import org.elasticsearch.action.TransportActions.Admin.Indices;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.RemoteTransportException;
import org.json.JSONException;
import org.json.JSONObject;
/**
*
* @author Sameek
*/
public class RabbitMqRiverIndexer
{
private Logger logger = null;
RabbitMqRiverIndexer() {
logger = LoggerFactory.getLogger("RabbitMqRiverIndexer", "resources/log4j.xml");
}
public boolean riverIndex(ConfigurationReader elasticServiceConfig, Client client, Channel channel, List jsonDoc, String indexName, String indexType, Object primaryColumnName) throws CustomException
{
boolean riverStatus = false;
try
{
String host = elasticServiceConfig.get("rabbitmq.host");
logger.info("host-" + host);
String port = elasticServiceConfig.get("rabbitmq.port");
logger.info("port-" + port);
String user = elasticServiceConfig.get("rabbitmq.username");
logger.info("user-" + user);
String pass = elasticServiceConfig.get("rabbitmq.password");
logger.info("pass-" + pass);
String vhost = elasticServiceConfig.get("rabbitmq.virtualhost");
logger.info("vhost-" + vhost);
String queue = elasticServiceConfig.get("rabbitmqriver.queue");
logger.info("queue-" + queue);
String exchange = elasticServiceConfig.get("rabbitmqriver.exchange");
logger.info("exchange-" + exchange);
String routing_key = elasticServiceConfig.get("rabbitmqriver.routingkey");
logger.info("routing_key-" + routing_key);
String exchange_type = elasticServiceConfig.get("rabbitmqriver.exchangetype");
logger.info("exchange_type-" + exchange_type);
String exchange_durable = elasticServiceConfig.get("rabbitmqriver.exchangedurable");
logger.info("exchange_durable-" + exchange_durable);
String queue_durable = elasticServiceConfig.get("rabbitmqriver.queuedurable");
logger.info("queue_durable-" + queue_durable);
String queue_auto_delete = elasticServiceConfig.get("rabbitmqriver.queueautodelete");
logger.info("queue_auto_delete-" + queue_auto_delete);
String bulk_size = elasticServiceConfig.get("rabbitmqriverindex.bulksize");
logger.info("bulk_size-" + bulk_size);
String bulk_timeout = elasticServiceConfig.get("rabbitmqriverindex.bulktimeout");
logger.info("bulk_timeout-" + bulk_timeout);
String ordered = elasticServiceConfig.get("rabbitmqriverindex.ordered");
logger.info("ordered-" + ordered);
//setting rabittmq river configuration
Map rabbitmqSettings = new HashMap();
rabbitmqSettings.put("host", host);
rabbitmqSettings.put("port", Integer.parseInt(port));
rabbitmqSettings.put("user", user);
rabbitmqSettings.put("pass", pass);
rabbitmqSettings.put("vhost", vhost);
rabbitmqSettings.put("queue", queue);
rabbitmqSettings.put("exchange", exchange);
rabbitmqSettings.put("routing_key", routing_key);
rabbitmqSettings.put("exchange_type", exchange_type);
rabbitmqSettings.put("exchange_durable", exchange_durable);
rabbitmqSettings.put("queue_durable", queue_durable);
rabbitmqSettings.put("queue_auto_delete", queue_auto_delete);
Map indexSettings = new HashMap();
indexSettings.put("bulk_size", Integer.parseInt(bulk_size));
indexSettings.put("bulk_timeout", bulk_timeout);
indexSettings.put("ordered", ordered);
String analyzerSettingsource= " { \n" +
" \"index\" : {\n"+
"\"analysis\" : {\n"+
"\"analyzer\" : {\n"+
" \"customindexanalyzer\" : {\n"+
"\"type\":\"custom\",\n"+
"\"tokenizer\" : \"whitespace\",\n"+
"\"filter\" : [\"lowercase\",\"asciifolding\",\"length\",\"mystopword\"],\n"+
"\"char_filter\" :[\"html_strip\"]\n"+
" },\n"+
"\"customsearchanalyzer\" : {\n"+
"\"type\":\"custom\",\n"+
" \"tokenizer\" : \"whitespace\",\n"+
"\"filter\" : [\"lowercase\",\"asciifolding\",\"length\",\"mystopword\"],\n"+
"\"char_filter\" :[\"html_strip\"]\n"+
"}\n"+
"},\n"+
"\"filter\" : {\n"+
"\"mystopword\": {\n"+
" \"type\" : \"stop\",\n"+
"\"stopwords_path\" :\"F:/resources/stopwordeng.txt\" ,\n"+
"\"ignore_case\":true\n"+
"}\n"+
"}\n"+
"}\n"+
"}\n"+
"}";
logger.info("analyzerSettingsource:::\n"+analyzerSettingsource);
CreateIndexResponse createindexresponseactionGet = client.admin().indices().create(new CreateIndexRequest(indexName).settings(analyzerSettingsource)).actionGet();
logger.info("createindexresponseactionGet::"+createindexresponseactionGet);
String indexAnalyzerMapping= "{\n"+
"\""+indexType+"\" : {\n"+
" \"index_analyzer\" : \"customindexanalyzer\",\n"+
"\"search_analyzer\" : \"customsearchanalyzer\"\n"+
"}\n"+
"}";
logger.info("indexAnalyzerMapping:::"+indexAnalyzerMapping);
PutMappingResponse actionGet = client.admin().indices().preparePutMapping(indexName).setType(indexType).setSource(indexAnalyzerMapping).execute().actionGet();
boolean acknowledged = actionGet.getAcknowledged();
logger.info("put mapping acknowledged::"+acknowledged);
client.prepareIndex("_river", "myriver", "_meta").setSource(jsonBuilder().startObject().field("type", "rabbitmq").field("rabbitmq", rabbitmqSettings).field("index", indexSettings).endObject()).execute().actionGet();
Iterator iterator = jsonDoc.iterator();
while (iterator.hasNext())
{
String jsondoc = iterator.next().toString();
JSONObject jsonObject = new JSONObject(jsondoc);
String uniqueId = jsonObject.get(primaryColumnName.toString()).toString();
logger.info("uniqueId--" + uniqueId);
String message = "{ \"index\" : { \"_index\" :\"" + indexName + "\", \"_type\" :\"" + indexType + "\", \"_id\" :\"" + uniqueId + "\"}}\n"
+"{ \"" + indexType + "\" :" + jsondoc + "}\n"
+"{ \"create\" : { \"_index\" :\"" + indexName + "\", \"_type\" :\"" + indexType + "\", \"_id\" :\"" + uniqueId + "\" }}\n"
+"{ \"" + indexType + "\" :" + jsondoc + "}";
logger.info("River Message-" + message);
channel.basicPublish(exchange, routing_key, null, message.getBytes());
logger.info("sucessfully publish message in rabbitmqriver");
}
riverStatus = true;
} catch (JSONException ex) {
logger.error(ex);
throw new CustomException(ex.getMessage(), getClass());
} catch (IOException ex) {
logger.error(ex);
throw new CustomException(ex.getMessage(), getClass());
} catch (ValueNotFoundException ex) {
logger.error(ex);
throw new CustomException(ex.getMessage(), getClass());
}
catch(RemoteTransportException ex)
{
logger.error(ex);
throw new CustomException(ex.getMessage(), getClass());
}
catch(Exception ex)
{
logger.error(ex);
throw new CustomException(ex.getMessage(), getClass());
}
return riverStatus;
}
}
String data="inc";
String indexColumn="Description";
//prepare query string builder
QueryStringQueryBuilder querystring = new QueryStringQueryBuilder(data);
SearchResponse searchResponse = client.prepareSearch(indexName).setTypes(indexType).setSearchType(SearchType.DEFAULT) .setQuery(querystring.field(indexColumn))
.setExplain(true).setSize(100).execute().actionGet();
is
the
an
a
to
and
or
of
for
Inc
computer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment