Last active
April 9, 2023 08:37
-
-
Save keke1123/7f130b62b1ea2c321aa7 to your computer and use it in GitHub Desktop.
Elasticsearch for Java API Simple Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Elasticsearch Java API Example | |
*/ | |
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; | |
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; | |
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; | |
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; | |
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; | |
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; | |
import org.elasticsearch.action.bulk.BulkProcessor; | |
import org.elasticsearch.action.bulk.BulkRequest; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.action.count.CountResponse; | |
import org.elasticsearch.action.delete.DeleteResponse; | |
import org.elasticsearch.action.get.GetResponse; | |
import org.elasticsearch.action.index.IndexRequest; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.action.search.SearchType; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.transport.TransportClient; | |
import org.elasticsearch.cluster.ClusterState; | |
import org.elasticsearch.cluster.metadata.IndexMetaData; | |
import org.elasticsearch.common.settings.ImmutableSettings; | |
import org.elasticsearch.common.settings.Settings; | |
import org.elasticsearch.common.transport.InetSocketTransportAddress; | |
import org.elasticsearch.common.unit.ByteSizeUnit; | |
import org.elasticsearch.common.unit.ByteSizeValue; | |
import org.elasticsearch.common.unit.TimeValue; | |
import org.elasticsearch.index.query.QueryBuilder; | |
import org.elasticsearch.search.SearchHit; | |
import org.elasticsearch.search.SearchHitField; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.util.*; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ForkJoinPool; | |
import java.util.concurrent.TimeUnit; | |
import static org.elasticsearch.index.query.QueryBuilders.*; | |
/** | |
* Created by SteveShin on 2015-05-08. | |
*/ | |
public class EsSample { | |
Logger logger = LoggerFactory.getLogger(EsSample.class); | |
static final ForkJoinPool forkJoinPool = new ForkJoinPool(); | |
static final int bytes = 1000; | |
static String indexName = "size_sample"; | |
static String indexType = "sample"; | |
static int amount = 1000000; | |
static final Map<String, Object> beforeSet = new HashMap<String, Object>(){ | |
{ | |
put("index.refresh_interval",-1); | |
put("index.number_of_replicas",0); | |
} | |
}; | |
static final Map<String, Object> afterSet = new HashMap<String, Object>(){ | |
{ | |
put("index.refresh_interval","10s"); | |
put("index.number_of_replicas",0); | |
} | |
}; | |
//create Index(schema) | |
public static void createIndex(Client client, String indexName) { | |
System.out.println(indexName); | |
IndicesExistsResponse res = client.admin().indices().prepareExists(indexName).execute().actionGet(); | |
if (!res.isExists()) { | |
CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(indexName); | |
createIndexRequestBuilder.setSettings(beforeSet); | |
CreateIndexResponse response = createIndexRequestBuilder.execute().actionGet(); | |
if(response.isAcknowledged()) { | |
String mappingJson = "{" + | |
"\"_all\":{\"enabled\":false}" + | |
",\"_source\":{\"enabled\":false}" + | |
",\"properties\":{" + | |
"\"body\":{\"type\":\"string\",\"store\":true}" + | |
",\"head\":{\"type\":\"object\",\"store\":false}" + | |
"}}"; | |
PutMappingResponse mappingResponse = client.admin().indices() | |
.preparePutMapping(indexName) | |
.setType(indexType) | |
.setSource(mappingJson) | |
.execute().actionGet(); | |
System.out.println("mapping... " + mappingResponse.isAcknowledged()); | |
} | |
System.out.println("CREATE index: " + response.isAcknowledged()); | |
} | |
} | |
//delete document | |
public static void deleteDocument(Client client, String indexName, String typeName, String documentId) { | |
DeleteResponse response = client.prepareDelete(indexName, typeName, documentId) | |
.execute()//.setOperationThreades(false)//run on this thread | |
.actionGet(); | |
} | |
//delete type | |
public static void deleteType(Client client, String indexName, String typeName) { | |
DeleteMappingResponse response = client.admin().indices().prepareDeleteMapping(indexName).setType(typeName).execute().actionGet(); | |
} | |
public static List<String> pumpup(List<String> logs, int loop) { | |
List<String> result = new ArrayList<String>(); | |
System.out.println("Loop:" + loop + ", total size: " + logs.size() * loop); | |
for (int i = 0; i < loop; i++) { | |
result.addAll(logs); | |
} | |
return result; | |
} | |
public static void putTest(Client client) throws IOException, InterruptedException, ExecutionException { | |
//prepare Bulk Processor | |
BulkProcessor bulkProcessor = BulkProcessor.builder(client, | |
new BulkProcessor.Listener() { | |
@Override | |
public void beforeBulk(long executionId, BulkRequest request) { | |
//System.out.println("Bulk process ready... : { id:" + executionId + ", size:" + request.requests().size() + "}\t"); | |
} | |
@Override | |
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { | |
System.out.println(" done... { id:" + executionId + ", time:" + response.getTookInMillis() | |
+ ", size:" + response.getItems().length + "}"); | |
} | |
@Override | |
public void afterBulk(long executionId, BulkRequest request, Throwable fail) { | |
fail.printStackTrace(); | |
System.out.println(" fail... { id:" + executionId + ", error:" + fail.getMessage() + "}"); | |
} | |
}) | |
.setBulkActions(10000) | |
.setBulkSize(new ByteSizeValue(200, ByteSizeUnit.MB)) | |
.setFlushInterval(TimeValue.timeValueSeconds(10)) | |
.setConcurrentRequests(10) | |
.build(); | |
} | |
public static void indexSetting(Client client, Map<String, Object> settings){ | |
UpdateSettingsResponse response = client.admin().indices().prepareUpdateSettings(indexName).setSettings(settings).execute().actionGet(); | |
System.out.println("update set:" + response.isAcknowledged()); | |
} | |
public static void put(Client client) throws IOException, InterruptedException, ExecutionException { | |
/* //get indice(schema) | |
GetMappingsResponse res = client.admin().indices().getMappings(new GetMappingsRequest().indices(indexName)).actionGet(); | |
ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(indexName); | |
for (ObjectObjectCursor<String, MappingMetaData> c : mapping) { | |
System.out.println("index find: " + c.key + " = " + c.value.source()); | |
}*/ | |
// indexSetting(client, beforeSet); | |
//indexSetting(client, beforeSet); | |
//CreateIndexResponse createIndexResponse = client.admin().indices().create(new CreateIndexRequestBuilder. .createIndexRequest()).actionGet(); | |
//prepare Bulk Processor | |
BulkProcessor bulkProcessor = BulkProcessor.builder(client, | |
new BulkProcessor.Listener() { | |
@Override | |
public void beforeBulk(long executionId, BulkRequest request) { | |
//System.out.println("Bulk process ready... : { id:" + executionId + ", size:" + request.requests().size() + "}\t"); | |
} | |
@Override | |
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { | |
System.out.println(" done... { id:" + executionId + ", time:" + response.getTookInMillis() | |
+ ", size:" + response.getItems().length + "}"); | |
} | |
@Override | |
public void afterBulk(long executionId, BulkRequest request, Throwable fail) { | |
fail.printStackTrace(); | |
System.out.println(" fail... { id:" + executionId + ", error:" + fail.getMessage() + "}"); | |
} | |
}) | |
.setBulkActions(10000) | |
.setBulkSize(new ByteSizeValue(150, ByteSizeUnit.MB)) | |
//.setFlushInterval(TimeValue.timeValueSeconds(120)) | |
.setConcurrentRequests(3) | |
.build(); | |
//get sample logs | |
List<String> message = pumpup(TtDummyRvMsgGenerate.read(), 1); | |
TtRvMsgParser parser = new TtRvMsgParser(); | |
List<Map<String, Object>> listMapList = new ArrayList<>(); | |
for (String msg : message) { | |
parser.init(msg);//.println(); | |
Map<String, Object> listMap = parser.result(); | |
listMapList.add(listMap); | |
} | |
int loop = 1; | |
long start = new Date().getTime(); | |
while (loop <= amount) { | |
//use not parsed | |
// for(final String msg : message){ | |
// @SuppressWarnings("unchecked") | |
// Map<String, String> fullMsg = new HashMap<String, String>(){{ | |
// put("body", TtDummyRvMsgGenerate.increaseByte(msg, 1000)); | |
// }}; | |
// bulkProcessor.add(new IndexRequest(indexName, indexType).source(fullMsg)); | |
// loop++; | |
// } | |
//use parsed | |
for (Map<String, Object> listMap : listMapList) { | |
//String a = (String)listMap.get("body"); | |
//System.out.println(a); | |
//listMap.put("body", ((String) listMap.get("body")).concat(TtDummyRvMsgGenerate.getSampleWords(15))); | |
//listMap.put("body", ((String) listMap.get("body")).concat(TtDummyRvMsgGenerate.randomKey(15))); | |
//listMap.put("body", TtDummyRvMsgGenerate.increaseByte((String) listMap.get("body"), 500)); | |
//listMap.put("loop", loop); | |
bulkProcessor.add(new IndexRequest(indexName, indexType, loop+"").source(listMap)); | |
loop++; | |
} | |
//System.exit(0); | |
} | |
long end = new Date().getTime(); | |
System.out.println("-------------------- ELAPSED:" + (end - start) + " millis"); | |
indexSetting(client, afterSet); | |
//Bulk Process close | |
bulkProcessor.awaitClose(10, TimeUnit.MINUTES); | |
} | |
public static void getDocument(Client client, String indexName, String typeName, String documentId) { | |
GetResponse getResponse = client.prepareGet(indexName, typeName, documentId).execute().actionGet(); | |
Map<String, Object> source = getResponse.getSource(); | |
System.out.println("------------------------------"); | |
System.out.println("Index: " + getResponse.getIndex()); | |
System.out.println("Type: " + getResponse.getType()); | |
System.out.println("Id: " + getResponse.getId()); | |
System.out.println("Version: " + getResponse.getVersion()); | |
System.out.println(source); | |
System.out.println("------------------------------"); | |
} | |
public static void fullTextSearchCount(Client client, String IndexName, String typeName, String value, String... field) { | |
QueryBuilder queryBuilder = | |
//moreLikeThisQuery(field).likeText(value).minTermFreq(1).maxQueryTerms(1000); | |
//wildcardQuery(field[0], value); | |
queryStringQuery(field[0] + ":" + value); | |
CountResponse response = client.prepareCount(IndexName).setTypes(typeName) | |
.setQuery(queryBuilder) | |
.execute().actionGet(); | |
System.out.println("\tCurrent More Like This count: " + response.getCount() + ""); | |
} | |
public static void fullTextSearchDocument(Client client, String indexName, String typeName, String value, String... field) { | |
QueryBuilder fieldQueryBuilder = | |
//termQuery(field[0], value); | |
matchQuery(field[0], value); | |
//new WildcardQueryBuilder(field[0], value); | |
//moreLikeThisQuery(field).likeText(value).minTermFreq(1).maxQueryTerms(25); | |
//wildcardQuery(field[0], value); | |
//queryStringQuery(field[0] + ":" + value); | |
//multiMatchQuery(value, field); | |
//SortBuilder sortBuilder = SortBuilders.fieldSort(field[0]); | |
SearchResponse searchResponse = client.prepareSearch(indexName).setTypes(typeName).addFields(field[0]) | |
.setSearchType(SearchType.QUERY_THEN_FETCH)//.addSort(sortBuilder) | |
.setQuery(fieldQueryBuilder)//.addSort(SortBuilders.fieldSort("loop").order(SortOrder.DESC)) | |
.setFrom(0).setSize(1000).setExplain(false).execute().actionGet(); | |
SearchHit[] result = searchResponse.getHits().getHits(); | |
System.out.println(searchResponse.getHits().getTotalHits() + " total count"); | |
//get result & print | |
System.out.println("\tCurrent More Like This results: " + result.length + ""); | |
for (SearchHit hit : result) { | |
Set<String> keys = hit.fields().keySet(); | |
Iterator<String> keyItr = keys.iterator(); | |
while(keyItr.hasNext()){ | |
SearchHitField hitField = hit.field(keyItr.next()); | |
System.out.println(hitField.getValue().toString()); | |
} | |
// Map<String, Object> r = hit.getSource(); | |
// System.out.println(r); | |
} | |
} | |
public static boolean isOpen(Client client, String indexName){ | |
ClusterState clusterState = client.admin().cluster().prepareState().setIndices(indexName).get().getState(); | |
IndexMetaData metaData = clusterState.getMetaData().index(indexName); | |
final String open = "OPEN"; | |
return metaData.getState() != null && metaData.getState().toString().equals(open); | |
} | |
public static void searchDocument(Client client, String indexName, String typeName, String value, String... field) { | |
//query using query builder | |
QueryBuilder matchQuery = matchQuery(field[0], value); | |
//System.out.println(matchQuery.toString()); //query json println | |
QueryBuilder multiMatchQuery = multiMatchQuery(value, field); | |
QueryBuilder stringQuery = queryStringQuery(field[0] + ":" + value); | |
//search query execution | |
SearchResponse searchResponse = client.prepareSearch(indexName) | |
.setTypes(typeName) | |
.setSearchType(SearchType.QUERY_THEN_FETCH) //search type. hold | |
.setQuery(stringQuery) | |
.setFrom(0).setSize(1000) //limit | |
.setExplain(false) //query explain | |
.execute().actionGet(); //execute | |
SearchHit[] result = searchResponse.getHits().getHits(); | |
//get result & print | |
System.out.println("Current Match results: " + result.length + "--------------------"); | |
for (SearchHit hit : result) { | |
Map<String, Object> r = hit.getSource(); | |
//System.out.println(r); | |
} | |
} | |
public static void count(Client client, String index, String type) { | |
CountResponse response = client.prepareCount(index) | |
.setQuery(termQuery("_type", type)) | |
.execute().actionGet(); | |
System.out.println("type:" + type + "`s total count: " + response.getCount()); | |
} | |
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { | |
//ES connection set | |
Settings settings = ImmutableSettings.settingsBuilder() | |
.put("action.auto_create_index", true) | |
.put("client.transport.sniff", true) | |
.put("client.transport.ping_timeout", "60s") | |
.put("network.tcp.block", true) | |
//.put("node.name", "ES3") | |
.put("cluster.name", "elasticsearch").build(); | |
//ES client | |
//default native transport : 9300 | |
//default web/rest port : 9200 | |
Client client = new TransportClient(settings) | |
.addTransportAddresses( | |
// new InetSocketTransportAddress("192.168.10.229", 9300) | |
new InetSocketTransportAddress("192.168.10.210", 9300) | |
, new InetSocketTransportAddress("192.168.10.251", 9300) | |
, new InetSocketTransportAddress("192.168.10.252", 9300) | |
); | |
// try { | |
// ClusterStatsResponse res = client.admin().cluster().prepareClusterStats().execute().actionGet(); | |
// | |
// PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("hdfs_repo1") | |
// .setType("hdfs") | |
// .setSettings( | |
// ImmutableSettings.builder() | |
// .put("uri", "hdfs://192.168.10.251:2181/") | |
// .put("path", "es/snapshot") | |
// .put("concurrent_streams", 4) | |
// .put("compress", true) | |
// .put("chunk_size", "40mb")).get(); | |
// System.out.println("repository put response:" + putRepositoryResponse.isAcknowledged()); | |
// | |
// CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("hdfs_repo1", "map1_2million") | |
// .setIncludeGlobalState(false) | |
// .setIndices("map1") | |
// .setRepository("hdfs_repo1") | |
// .setSettings(ImmutableSettings.builder().put("compress", true)) | |
// .setWaitForCompletion(true) | |
// .get(); | |
// System.out.println("success:" + createSnapshotResponse.getSnapshotInfo().successfulShards()); | |
// | |
// GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("hdfs_repo1").get(); | |
// ImmutableList<SnapshotInfo> snapshotInfos = getSnapshotsResponse.getSnapshots(); | |
// for(SnapshotInfo info : snapshotInfos){ | |
// System.out.println(info.name()); | |
// } | |
// | |
// RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("hdfs_repo1", "map1_2million") | |
// .setIndexSettings( | |
// ImmutableSettings.builder().put("index.number_of_replicas", 0)) | |
// .setIndices("map1") | |
// .setWaitForCompletion(true) | |
// .setRenamePattern("map[.*]") | |
// .setRenameReplacement("restore_map$1") | |
// .setIgnoreIndexSettings("index.refresh_interval", "") | |
// .get(); | |
// | |
// | |
// System.out.println(res.getClusterNameAsString()); | |
indexName = "disable_source"; | |
indexType = "test"; | |
amount = 10000; | |
fullTextSearchDocument(client, indexName, indexType, "tmtm", "body"); | |
System.exit(0); | |
//indexType = "test"; | |
createIndex(client, indexName); | |
//Thread.sleep(1000); | |
put(client); | |
//System.out.println("loop : " + i); | |
String repoName = "hdfs-repo"; | |
System.out.println("PUT END"); | |
// long start_ = new Date().getTime(); | |
// count(client, indexName, indexType); | |
// long end_ = new Date().getTime(); | |
// System.out.println("--------------------------\nEnd count : " + (end_ - start_) + " time elapsed"); | |
// //getDocument(client, indexName, indexType, "1000"); | |
// //searchDocument(client, indexName, indexType, "155431.679016", "TXN_TIME", "TXN_KEY"); | |
List<String> query = Arrays.asList( | |
"*ASDF1186*" | |
, "*QLEGASD1800*" | |
, "*WEFLKJASLD118*" | |
, "*JGKLJDLSF1006*" | |
, "*SLD2186*" | |
, "*DLFKJQWEF1686*" | |
, "*LKJFASD2718*" | |
, "*LKJGQLEGASD235*" | |
, "*KJQWEF1361*" | |
, "*QWEFLKJASLD2*" | |
, "*KLJGKLJDLSF ASDLKJFASD1*" | |
); | |
String index = "test3"; | |
String type = "rv_10000"; | |
//deleteType(client, "test", "ex_1000"); | |
// deleteType(client, "test2", "ex_10000_3"); | |
// System.out.println("delete complete"); | |
// System.exit(0); | |
System.out.println(type + " full text search start"); | |
System.out.println("--------------------------------------------------------------------------------------------------"); | |
for (int i = 0; i <= 9; i++) { | |
System.out.println("LOOP" + i + "test start"); | |
for (String q : query) { | |
long start = new Date().getTime(); | |
String mergeQuery = q; | |
//fullTextSearchCount(client, index, type, mergeQuery, "body"); | |
fullTextSearchDocument(client, index, type, mergeQuery, "body"); | |
long end = new Date().getTime(); | |
System.out.println("\t--------------End search : " + "query:" + mergeQuery + ", " + (end - start) + " time elapsed"); | |
} | |
System.out.println("end of test---------------------------------------------------------------------------------------"); | |
// Thread.sleep(5000); | |
} | |
// long start1 = new Date().getTime(); | |
// fullTextSearchCount(client, indexName, indexType, " tmtm", "body"); | |
// long end2 = new Date().getTime(); | |
// System.out.println("--------------------------\nEnd count : " + (end2 - start1) + " time elapsed"); | |
// } catch (IOException | InterruptedException | ExecutionException e) { | |
// e.printStackTrace(); | |
// } | |
//Connection close | |
client.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment