Skip to content

Instantly share code, notes, and snippets.

@Udith
Last active November 28, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Udith/069e21c9d744cf8f4ab7 to your computer and use it in GitHub Desktop.
Save Udith/069e21c9d744cf8f4ab7 to your computer and use it in GitHub Desktop.
package com.sample.es.udith;
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import java.util.Calendar;
/**
* @author Udith Gunaratna
*/
public class ESManager {
private final String ES_INDEX;
private final String ES_TYPE;
private final String ES_TIME_FIELD;
private final int EXPIRE_DAYS;
private final TransportClient elasticsearch;
public ESManager() {
ES_INDEX = "my_index";
ES_TYPE = "my_record_type";
ES_TIME_FIELD = "publishTime";
EXPIRE_DAYS = 180;
elasticsearch = new TransportClient()
.addTransportAddress(new InetSocketTransportAddress("localhost", 9300));
}
public void cleanESDocs() {
final Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -EXPIRE_DAYS);
BulkRequestBuilder bulkRequest = elasticsearch.prepareBulk();
SearchResponse scrollResp = elasticsearch.prepareSearch(ES_INDEX).setTypes(ES_TYPE)
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(600000))
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(),
FilterBuilders.andFilter(
FilterBuilders.rangeFilter(ES_TIME_FIELD).to(calendar.getTimeInMillis())
)))
.setNoFields()
.setSize(100).execute().actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
bulkRequest.add(elasticsearch.prepareDelete()
.setIndex(ES_INDEX)
.setType(ES_TYPE)
.setId(hit.getId()));
}
scrollResp = elasticsearch.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
if ((bulkRequest != null) && (!bulkRequest.request().requests().isEmpty())) {
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("Elasticsearch cleaning failed");
} else {
System.out.println("Elasticsearch cleaning finished deleting " +
bulkResponse.getItems().length + " documents");
optimizeIndex();
}
} else {
System.out.println("Elasticsearch cleaning finished without deleting any documents");
}
}
private void optimizeIndex() {
OptimizeResponse optimizeResponse = elasticsearch.admin().indices()
.prepareOptimize(ES_INDEX)
.setFlush(true)
.setOnlyExpungeDeletes(false)
.execute().actionGet();
System.out.println("Elasticsearch index optimization finished with " +
optimizeResponse.getSuccessfulShards() + " successful and " +
optimizeResponse.getFailedShards() + " failed shards out of " +
optimizeResponse.getTotalShards() + " total shards");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment