Created
January 19, 2012 05:57
-
-
Save sameek/1638251 to your computer and use it in GitHub Desktop.
RabbitMQ River With Custom Analyzer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"index" : { | |
"analysis" : { | |
"analyzer" : { | |
"customindexanalyzer" : { | |
"type":"custom", | |
"tokenizer" : "whitespace", | |
"filter" : ["lowercase","asciifolding","length","mystopword"], | |
"char_filter" :["html_strip"] | |
}, | |
"customsearchanalyzer" : { | |
"type":"custom", | |
"tokenizer" : "whitespace", | |
"filter" : ["lowercase","asciifolding","length","mystopword"], | |
"char_filter" :["html_strip"] | |
} | |
}, | |
"filter" : { | |
"mystopword": { | |
"type" : "stop", | |
"stopwords_path" :"F:/resources/stopwordeng.txt" , | |
"ignore_case":true | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"test_type" : { | |
"index_analyzer" : "customindexanalyzer", | |
"search_analyzer" : "customsearchanalyzer" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* To change this template, choose Tools | Templates | |
* and open the template in the editor. | |
*/ | |
package com.arosys.doqeap.esearch; | |
import com.arosys.configurationreader.ConfigurationReader; | |
import com.arosys.customexception.CustomException; | |
import com.arosys.doqeap.exception.ValueNotFoundException; | |
import com.arosys.logger.LoggerFactory; | |
import com.rabbitmq.client.Channel; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; | |
import static org.elasticsearch.common.xcontent.XContentFactory.*; | |
import org.apache.log4j.Logger; | |
import org.elasticsearch.action.TransportActions.Admin.Indices; | |
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; | |
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; | |
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.common.settings.ImmutableSettings; | |
import org.elasticsearch.common.xcontent.XContentBuilder; | |
import org.elasticsearch.common.xcontent.XContentFactory; | |
import org.elasticsearch.transport.RemoteTransportException; | |
import org.json.JSONException; | |
import org.json.JSONObject; | |
/** | |
* | |
* @author Sameek | |
*/ | |
public class RabbitMqRiverIndexer | |
{ | |
private Logger logger = null; | |
RabbitMqRiverIndexer() { | |
logger = LoggerFactory.getLogger("RabbitMqRiverIndexer", "resources/log4j.xml"); | |
} | |
public boolean riverIndex(ConfigurationReader elasticServiceConfig, Client client, Channel channel, List jsonDoc, String indexName, String indexType, Object primaryColumnName) throws CustomException | |
{ | |
boolean riverStatus = false; | |
try | |
{ | |
String host = elasticServiceConfig.get("rabbitmq.host"); | |
logger.info("host-" + host); | |
String port = elasticServiceConfig.get("rabbitmq.port"); | |
logger.info("port-" + port); | |
String user = elasticServiceConfig.get("rabbitmq.username"); | |
logger.info("user-" + user); | |
String pass = elasticServiceConfig.get("rabbitmq.password"); | |
logger.info("pass-" + pass); | |
String vhost = elasticServiceConfig.get("rabbitmq.virtualhost"); | |
logger.info("vhost-" + vhost); | |
String queue = elasticServiceConfig.get("rabbitmqriver.queue"); | |
logger.info("queue-" + queue); | |
String exchange = elasticServiceConfig.get("rabbitmqriver.exchange"); | |
logger.info("exchange-" + exchange); | |
String routing_key = elasticServiceConfig.get("rabbitmqriver.routingkey"); | |
logger.info("routing_key-" + routing_key); | |
String exchange_type = elasticServiceConfig.get("rabbitmqriver.exchangetype"); | |
logger.info("exchange_type-" + exchange_type); | |
String exchange_durable = elasticServiceConfig.get("rabbitmqriver.exchangedurable"); | |
logger.info("exchange_durable-" + exchange_durable); | |
String queue_durable = elasticServiceConfig.get("rabbitmqriver.queuedurable"); | |
logger.info("queue_durable-" + queue_durable); | |
String queue_auto_delete = elasticServiceConfig.get("rabbitmqriver.queueautodelete"); | |
logger.info("queue_auto_delete-" + queue_auto_delete); | |
String bulk_size = elasticServiceConfig.get("rabbitmqriverindex.bulksize"); | |
logger.info("bulk_size-" + bulk_size); | |
String bulk_timeout = elasticServiceConfig.get("rabbitmqriverindex.bulktimeout"); | |
logger.info("bulk_timeout-" + bulk_timeout); | |
String ordered = elasticServiceConfig.get("rabbitmqriverindex.ordered"); | |
logger.info("ordered-" + ordered); | |
//setting rabittmq river configuration | |
Map rabbitmqSettings = new HashMap(); | |
rabbitmqSettings.put("host", host); | |
rabbitmqSettings.put("port", Integer.parseInt(port)); | |
rabbitmqSettings.put("user", user); | |
rabbitmqSettings.put("pass", pass); | |
rabbitmqSettings.put("vhost", vhost); | |
rabbitmqSettings.put("queue", queue); | |
rabbitmqSettings.put("exchange", exchange); | |
rabbitmqSettings.put("routing_key", routing_key); | |
rabbitmqSettings.put("exchange_type", exchange_type); | |
rabbitmqSettings.put("exchange_durable", exchange_durable); | |
rabbitmqSettings.put("queue_durable", queue_durable); | |
rabbitmqSettings.put("queue_auto_delete", queue_auto_delete); | |
Map indexSettings = new HashMap(); | |
indexSettings.put("bulk_size", Integer.parseInt(bulk_size)); | |
indexSettings.put("bulk_timeout", bulk_timeout); | |
indexSettings.put("ordered", ordered); | |
String analyzerSettingsource= " { \n" + | |
" \"index\" : {\n"+ | |
"\"analysis\" : {\n"+ | |
"\"analyzer\" : {\n"+ | |
" \"customindexanalyzer\" : {\n"+ | |
"\"type\":\"custom\",\n"+ | |
"\"tokenizer\" : \"whitespace\",\n"+ | |
"\"filter\" : [\"lowercase\",\"asciifolding\",\"length\",\"mystopword\"],\n"+ | |
"\"char_filter\" :[\"html_strip\"]\n"+ | |
" },\n"+ | |
"\"customsearchanalyzer\" : {\n"+ | |
"\"type\":\"custom\",\n"+ | |
" \"tokenizer\" : \"whitespace\",\n"+ | |
"\"filter\" : [\"lowercase\",\"asciifolding\",\"length\",\"mystopword\"],\n"+ | |
"\"char_filter\" :[\"html_strip\"]\n"+ | |
"}\n"+ | |
"},\n"+ | |
"\"filter\" : {\n"+ | |
"\"mystopword\": {\n"+ | |
" \"type\" : \"stop\",\n"+ | |
"\"stopwords_path\" :\"F:/resources/stopwordeng.txt\" ,\n"+ | |
"\"ignore_case\":true\n"+ | |
"}\n"+ | |
"}\n"+ | |
"}\n"+ | |
"}\n"+ | |
"}"; | |
logger.info("analyzerSettingsource:::\n"+analyzerSettingsource); | |
CreateIndexResponse createindexresponseactionGet = client.admin().indices().create(new CreateIndexRequest(indexName).settings(analyzerSettingsource)).actionGet(); | |
logger.info("createindexresponseactionGet::"+createindexresponseactionGet); | |
String indexAnalyzerMapping= "{\n"+ | |
"\""+indexType+"\" : {\n"+ | |
" \"index_analyzer\" : \"customindexanalyzer\",\n"+ | |
"\"search_analyzer\" : \"customsearchanalyzer\"\n"+ | |
"}\n"+ | |
"}"; | |
logger.info("indexAnalyzerMapping:::"+indexAnalyzerMapping); | |
PutMappingResponse actionGet = client.admin().indices().preparePutMapping(indexName).setType(indexType).setSource(indexAnalyzerMapping).execute().actionGet(); | |
boolean acknowledged = actionGet.getAcknowledged(); | |
logger.info("put mapping acknowledged::"+acknowledged); | |
client.prepareIndex("_river", "myriver", "_meta").setSource(jsonBuilder().startObject().field("type", "rabbitmq").field("rabbitmq", rabbitmqSettings).field("index", indexSettings).endObject()).execute().actionGet(); | |
Iterator iterator = jsonDoc.iterator(); | |
while (iterator.hasNext()) | |
{ | |
String jsondoc = iterator.next().toString(); | |
JSONObject jsonObject = new JSONObject(jsondoc); | |
String uniqueId = jsonObject.get(primaryColumnName.toString()).toString(); | |
logger.info("uniqueId--" + uniqueId); | |
String message = "{ \"index\" : { \"_index\" :\"" + indexName + "\", \"_type\" :\"" + indexType + "\", \"_id\" :\"" + uniqueId + "\"}}\n" | |
+"{ \"" + indexType + "\" :" + jsondoc + "}\n" | |
+"{ \"create\" : { \"_index\" :\"" + indexName + "\", \"_type\" :\"" + indexType + "\", \"_id\" :\"" + uniqueId + "\" }}\n" | |
+"{ \"" + indexType + "\" :" + jsondoc + "}"; | |
logger.info("River Message-" + message); | |
channel.basicPublish(exchange, routing_key, null, message.getBytes()); | |
logger.info("sucessfully publish message in rabbitmqriver"); | |
} | |
riverStatus = true; | |
} catch (JSONException ex) { | |
logger.error(ex); | |
throw new CustomException(ex.getMessage(), getClass()); | |
} catch (IOException ex) { | |
logger.error(ex); | |
throw new CustomException(ex.getMessage(), getClass()); | |
} catch (ValueNotFoundException ex) { | |
logger.error(ex); | |
throw new CustomException(ex.getMessage(), getClass()); | |
} | |
catch(RemoteTransportException ex) | |
{ | |
logger.error(ex); | |
throw new CustomException(ex.getMessage(), getClass()); | |
} | |
catch(Exception ex) | |
{ | |
logger.error(ex); | |
throw new CustomException(ex.getMessage(), getClass()); | |
} | |
return riverStatus; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
String data="inc"; | |
String indexColumn="Description"; | |
//prepare query string builder | |
QueryStringQueryBuilder querystring = new QueryStringQueryBuilder(data); | |
SearchResponse searchResponse = client.prepareSearch(indexName).setTypes(indexType).setSearchType(SearchType.DEFAULT) .setQuery(querystring.field(indexColumn)) | |
.setExplain(true).setSize(100).execute().actionGet(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
is | |
the | |
an | |
a | |
to | |
and | |
or | |
of | |
for | |
Inc | |
computer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment