Skip to content

Instantly share code, notes, and snippets.

@keke1123
Last active April 9, 2023 08:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save keke1123/7f130b62b1ea2c321aa7 to your computer and use it in GitHub Desktop.
Save keke1123/7f130b62b1ea2c321aa7 to your computer and use it in GitHub Desktop.
Elasticsearch for Java API Simple Example
/**
* Elasticsearch Java API Example
*/
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.*;
/**
* Created by SteveShin on 2015-05-08.
*/
public class EsSample {
Logger logger = LoggerFactory.getLogger(EsSample.class);
static final ForkJoinPool forkJoinPool = new ForkJoinPool();
static final int bytes = 1000;
static String indexName = "size_sample";
static String indexType = "sample";
static int amount = 1000000;
static final Map<String, Object> beforeSet = new HashMap<String, Object>(){
{
put("index.refresh_interval",-1);
put("index.number_of_replicas",0);
}
};
static final Map<String, Object> afterSet = new HashMap<String, Object>(){
{
put("index.refresh_interval","10s");
put("index.number_of_replicas",0);
}
};
//create Index(schema)
public static void createIndex(Client client, String indexName) {
System.out.println(indexName);
IndicesExistsResponse res = client.admin().indices().prepareExists(indexName).execute().actionGet();
if (!res.isExists()) {
CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(indexName);
createIndexRequestBuilder.setSettings(beforeSet);
CreateIndexResponse response = createIndexRequestBuilder.execute().actionGet();
if(response.isAcknowledged()) {
String mappingJson = "{" +
"\"_all\":{\"enabled\":false}" +
",\"_source\":{\"enabled\":false}" +
",\"properties\":{" +
"\"body\":{\"type\":\"string\",\"store\":true}" +
",\"head\":{\"type\":\"object\",\"store\":false}" +
"}}";
PutMappingResponse mappingResponse = client.admin().indices()
.preparePutMapping(indexName)
.setType(indexType)
.setSource(mappingJson)
.execute().actionGet();
System.out.println("mapping... " + mappingResponse.isAcknowledged());
}
System.out.println("CREATE index: " + response.isAcknowledged());
}
}
//delete document
public static void deleteDocument(Client client, String indexName, String typeName, String documentId) {
DeleteResponse response = client.prepareDelete(indexName, typeName, documentId)
.execute()//.setOperationThreades(false)//run on this thread
.actionGet();
}
//delete type
public static void deleteType(Client client, String indexName, String typeName) {
DeleteMappingResponse response = client.admin().indices().prepareDeleteMapping(indexName).setType(typeName).execute().actionGet();
}
public static List<String> pumpup(List<String> logs, int loop) {
List<String> result = new ArrayList<String>();
System.out.println("Loop:" + loop + ", total size: " + logs.size() * loop);
for (int i = 0; i < loop; i++) {
result.addAll(logs);
}
return result;
}
public static void putTest(Client client) throws IOException, InterruptedException, ExecutionException {
//prepare Bulk Processor
BulkProcessor bulkProcessor = BulkProcessor.builder(client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
//System.out.println("Bulk process ready... : { id:" + executionId + ", size:" + request.requests().size() + "}\t");
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.out.println(" done... { id:" + executionId + ", time:" + response.getTookInMillis()
+ ", size:" + response.getItems().length + "}");
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable fail) {
fail.printStackTrace();
System.out.println(" fail... { id:" + executionId + ", error:" + fail.getMessage() + "}");
}
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(200, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(10)
.build();
}
public static void indexSetting(Client client, Map<String, Object> settings){
UpdateSettingsResponse response = client.admin().indices().prepareUpdateSettings(indexName).setSettings(settings).execute().actionGet();
System.out.println("update set:" + response.isAcknowledged());
}
public static void put(Client client) throws IOException, InterruptedException, ExecutionException {
/* //get indice(schema)
GetMappingsResponse res = client.admin().indices().getMappings(new GetMappingsRequest().indices(indexName)).actionGet();
ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(indexName);
for (ObjectObjectCursor<String, MappingMetaData> c : mapping) {
System.out.println("index find: " + c.key + " = " + c.value.source());
}*/
// indexSetting(client, beforeSet);
//indexSetting(client, beforeSet);
//CreateIndexResponse createIndexResponse = client.admin().indices().create(new CreateIndexRequestBuilder. .createIndexRequest()).actionGet();
//prepare Bulk Processor
BulkProcessor bulkProcessor = BulkProcessor.builder(client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
//System.out.println("Bulk process ready... : { id:" + executionId + ", size:" + request.requests().size() + "}\t");
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.out.println(" done... { id:" + executionId + ", time:" + response.getTookInMillis()
+ ", size:" + response.getItems().length + "}");
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable fail) {
fail.printStackTrace();
System.out.println(" fail... { id:" + executionId + ", error:" + fail.getMessage() + "}");
}
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(150, ByteSizeUnit.MB))
//.setFlushInterval(TimeValue.timeValueSeconds(120))
.setConcurrentRequests(3)
.build();
//get sample logs
List<String> message = pumpup(TtDummyRvMsgGenerate.read(), 1);
TtRvMsgParser parser = new TtRvMsgParser();
List<Map<String, Object>> listMapList = new ArrayList<>();
for (String msg : message) {
parser.init(msg);//.println();
Map<String, Object> listMap = parser.result();
listMapList.add(listMap);
}
int loop = 1;
long start = new Date().getTime();
while (loop <= amount) {
//use not parsed
// for(final String msg : message){
// @SuppressWarnings("unchecked")
// Map<String, String> fullMsg = new HashMap<String, String>(){{
// put("body", TtDummyRvMsgGenerate.increaseByte(msg, 1000));
// }};
// bulkProcessor.add(new IndexRequest(indexName, indexType).source(fullMsg));
// loop++;
// }
//use parsed
for (Map<String, Object> listMap : listMapList) {
//String a = (String)listMap.get("body");
//System.out.println(a);
//listMap.put("body", ((String) listMap.get("body")).concat(TtDummyRvMsgGenerate.getSampleWords(15)));
//listMap.put("body", ((String) listMap.get("body")).concat(TtDummyRvMsgGenerate.randomKey(15)));
//listMap.put("body", TtDummyRvMsgGenerate.increaseByte((String) listMap.get("body"), 500));
//listMap.put("loop", loop);
bulkProcessor.add(new IndexRequest(indexName, indexType, loop+"").source(listMap));
loop++;
}
//System.exit(0);
}
long end = new Date().getTime();
System.out.println("-------------------- ELAPSED:" + (end - start) + " millis");
indexSetting(client, afterSet);
//Bulk Process close
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
}
public static void getDocument(Client client, String indexName, String typeName, String documentId) {
GetResponse getResponse = client.prepareGet(indexName, typeName, documentId).execute().actionGet();
Map<String, Object> source = getResponse.getSource();
System.out.println("------------------------------");
System.out.println("Index: " + getResponse.getIndex());
System.out.println("Type: " + getResponse.getType());
System.out.println("Id: " + getResponse.getId());
System.out.println("Version: " + getResponse.getVersion());
System.out.println(source);
System.out.println("------------------------------");
}
public static void fullTextSearchCount(Client client, String IndexName, String typeName, String value, String... field) {
QueryBuilder queryBuilder =
//moreLikeThisQuery(field).likeText(value).minTermFreq(1).maxQueryTerms(1000);
//wildcardQuery(field[0], value);
queryStringQuery(field[0] + ":" + value);
CountResponse response = client.prepareCount(IndexName).setTypes(typeName)
.setQuery(queryBuilder)
.execute().actionGet();
System.out.println("\tCurrent More Like This count: " + response.getCount() + "");
}
public static void fullTextSearchDocument(Client client, String indexName, String typeName, String value, String... field) {
QueryBuilder fieldQueryBuilder =
//termQuery(field[0], value);
matchQuery(field[0], value);
//new WildcardQueryBuilder(field[0], value);
//moreLikeThisQuery(field).likeText(value).minTermFreq(1).maxQueryTerms(25);
//wildcardQuery(field[0], value);
//queryStringQuery(field[0] + ":" + value);
//multiMatchQuery(value, field);
//SortBuilder sortBuilder = SortBuilders.fieldSort(field[0]);
SearchResponse searchResponse = client.prepareSearch(indexName).setTypes(typeName).addFields(field[0])
.setSearchType(SearchType.QUERY_THEN_FETCH)//.addSort(sortBuilder)
.setQuery(fieldQueryBuilder)//.addSort(SortBuilders.fieldSort("loop").order(SortOrder.DESC))
.setFrom(0).setSize(1000).setExplain(false).execute().actionGet();
SearchHit[] result = searchResponse.getHits().getHits();
System.out.println(searchResponse.getHits().getTotalHits() + " total count");
//get result & print
System.out.println("\tCurrent More Like This results: " + result.length + "");
for (SearchHit hit : result) {
Set<String> keys = hit.fields().keySet();
Iterator<String> keyItr = keys.iterator();
while(keyItr.hasNext()){
SearchHitField hitField = hit.field(keyItr.next());
System.out.println(hitField.getValue().toString());
}
// Map<String, Object> r = hit.getSource();
// System.out.println(r);
}
}
public static boolean isOpen(Client client, String indexName){
ClusterState clusterState = client.admin().cluster().prepareState().setIndices(indexName).get().getState();
IndexMetaData metaData = clusterState.getMetaData().index(indexName);
final String open = "OPEN";
return metaData.getState() != null && metaData.getState().toString().equals(open);
}
public static void searchDocument(Client client, String indexName, String typeName, String value, String... field) {
//query using query builder
QueryBuilder matchQuery = matchQuery(field[0], value);
//System.out.println(matchQuery.toString()); //query json println
QueryBuilder multiMatchQuery = multiMatchQuery(value, field);
QueryBuilder stringQuery = queryStringQuery(field[0] + ":" + value);
//search query execution
SearchResponse searchResponse = client.prepareSearch(indexName)
.setTypes(typeName)
.setSearchType(SearchType.QUERY_THEN_FETCH) //search type. hold
.setQuery(stringQuery)
.setFrom(0).setSize(1000) //limit
.setExplain(false) //query explain
.execute().actionGet(); //execute
SearchHit[] result = searchResponse.getHits().getHits();
//get result & print
System.out.println("Current Match results: " + result.length + "--------------------");
for (SearchHit hit : result) {
Map<String, Object> r = hit.getSource();
//System.out.println(r);
}
}
public static void count(Client client, String index, String type) {
CountResponse response = client.prepareCount(index)
.setQuery(termQuery("_type", type))
.execute().actionGet();
System.out.println("type:" + type + "`s total count: " + response.getCount());
}
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
//ES connection set
Settings settings = ImmutableSettings.settingsBuilder()
.put("action.auto_create_index", true)
.put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "60s")
.put("network.tcp.block", true)
//.put("node.name", "ES3")
.put("cluster.name", "elasticsearch").build();
//ES client
//default native transport : 9300
//default web/rest port : 9200
Client client = new TransportClient(settings)
.addTransportAddresses(
// new InetSocketTransportAddress("192.168.10.229", 9300)
new InetSocketTransportAddress("192.168.10.210", 9300)
, new InetSocketTransportAddress("192.168.10.251", 9300)
, new InetSocketTransportAddress("192.168.10.252", 9300)
);
// try {
// ClusterStatsResponse res = client.admin().cluster().prepareClusterStats().execute().actionGet();
//
// PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("hdfs_repo1")
// .setType("hdfs")
// .setSettings(
// ImmutableSettings.builder()
// .put("uri", "hdfs://192.168.10.251:2181/")
// .put("path", "es/snapshot")
// .put("concurrent_streams", 4)
// .put("compress", true)
// .put("chunk_size", "40mb")).get();
// System.out.println("repository put response:" + putRepositoryResponse.isAcknowledged());
//
// CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("hdfs_repo1", "map1_2million")
// .setIncludeGlobalState(false)
// .setIndices("map1")
// .setRepository("hdfs_repo1")
// .setSettings(ImmutableSettings.builder().put("compress", true))
// .setWaitForCompletion(true)
// .get();
// System.out.println("success:" + createSnapshotResponse.getSnapshotInfo().successfulShards());
//
// GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("hdfs_repo1").get();
// ImmutableList<SnapshotInfo> snapshotInfos = getSnapshotsResponse.getSnapshots();
// for(SnapshotInfo info : snapshotInfos){
// System.out.println(info.name());
// }
//
// RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("hdfs_repo1", "map1_2million")
// .setIndexSettings(
// ImmutableSettings.builder().put("index.number_of_replicas", 0))
// .setIndices("map1")
// .setWaitForCompletion(true)
// .setRenamePattern("map[.*]")
// .setRenameReplacement("restore_map$1")
// .setIgnoreIndexSettings("index.refresh_interval", "")
// .get();
//
//
// System.out.println(res.getClusterNameAsString());
indexName = "disable_source";
indexType = "test";
amount = 10000;
fullTextSearchDocument(client, indexName, indexType, "tmtm", "body");
System.exit(0);
//indexType = "test";
createIndex(client, indexName);
//Thread.sleep(1000);
put(client);
//System.out.println("loop : " + i);
String repoName = "hdfs-repo";
System.out.println("PUT END");
// long start_ = new Date().getTime();
// count(client, indexName, indexType);
// long end_ = new Date().getTime();
// System.out.println("--------------------------\nEnd count : " + (end_ - start_) + " time elapsed");
// //getDocument(client, indexName, indexType, "1000");
// //searchDocument(client, indexName, indexType, "155431.679016", "TXN_TIME", "TXN_KEY");
List<String> query = Arrays.asList(
"*ASDF1186*"
, "*QLEGASD1800*"
, "*WEFLKJASLD118*"
, "*JGKLJDLSF1006*"
, "*SLD2186*"
, "*DLFKJQWEF1686*"
, "*LKJFASD2718*"
, "*LKJGQLEGASD235*"
, "*KJQWEF1361*"
, "*QWEFLKJASLD2*"
, "*KLJGKLJDLSF ASDLKJFASD1*"
);
String index = "test3";
String type = "rv_10000";
//deleteType(client, "test", "ex_1000");
// deleteType(client, "test2", "ex_10000_3");
// System.out.println("delete complete");
// System.exit(0);
System.out.println(type + " full text search start");
System.out.println("--------------------------------------------------------------------------------------------------");
for (int i = 0; i <= 9; i++) {
System.out.println("LOOP" + i + "test start");
for (String q : query) {
long start = new Date().getTime();
String mergeQuery = q;
//fullTextSearchCount(client, index, type, mergeQuery, "body");
fullTextSearchDocument(client, index, type, mergeQuery, "body");
long end = new Date().getTime();
System.out.println("\t--------------End search : " + "query:" + mergeQuery + ", " + (end - start) + " time elapsed");
}
System.out.println("end of test---------------------------------------------------------------------------------------");
// Thread.sleep(5000);
}
// long start1 = new Date().getTime();
// fullTextSearchCount(client, indexName, indexType, " tmtm", "body");
// long end2 = new Date().getTime();
// System.out.println("--------------------------\nEnd count : " + (end2 - start1) + " time elapsed");
// } catch (IOException | InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
//Connection close
client.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment