Skip to content

Instantly share code, notes, and snippets.

@henrikno
Created September 2, 2015 13:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save henrikno/e0ebd6804cb62491343c to your computer and use it in GitHub Desktop.
Save henrikno/e0ebd6804cb62491343c to your computer and use it in GitHub Desktop.
Elasticsearch bulk index with retry and failure check
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
public class ElasticSearchTest {
public static final String INDEX_NAME = "dummy_index";
private TransportClient client;
public static void main(String[] args) throws Exception {
new ElasticSearchTest().run();
}
public void run() throws InterruptedException {
setup();
if (!client.admin().indices().prepareExists(INDEX_NAME).get().isExists()) {
client.admin().indices().prepareCreate(INDEX_NAME).get();
}
System.out.println("Waiting...");
client.admin().cluster().prepareHealth().setWaitForGreenStatus().get();
System.out.println("Starting...");
int numberOfRequests = 200;
long startTime = System.nanoTime();
List<String> ids = new ArrayList<>();
for (int j = 0; j < numberOfRequests; j++) {
retry(j, (x) -> {
BulkRequestBuilder bulk = client.prepareBulk();
int bulkSize = 1000;
for (int i = 0; i < bulkSize; i++) {
String id = String.valueOf(bulkSize * x + i);
bulk.add(client.prepareIndex(INDEX_NAME, "dummy", id)
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(DOCUMENT.replace("<<documentId>>", id)));
}
System.out.println("Bulk " + x + " started");
BulkResponse bulkResponse = bulk.get();
if (bulkResponse.hasFailures()) {
throw new RuntimeException(bulkResponse.buildFailureMessage());
}
for (BulkItemResponse baa : bulkResponse) {
if (baa.getResponse().getShardInfo().getFailed() > 0) {
throw new RuntimeException("Shard level failure");
}
}
// FlushResponse flush = client.admin().indices().prepareFlush(INDEX_NAME).setWaitIfOngoing(true).get();
// if (flush.getFailedShards() > 0) {
// throw new RuntimeException("Flush failed");
// }
// client.admin().indices().prepareRefresh("dummy_index").get();
for (BulkItemResponse item : bulkResponse) {
if (!item.isFailed()) {
ids.add(item.getId());
}
}
System.out.println("Bulk " + x + " finished");
});
}
client.admin().cluster().prepareHealth().setWaitForGreenStatus().get();
long endTime = System.nanoTime();
long msUsed = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
System.out.println("ms used: " + msUsed);
System.out.println("Documents: " + ids.size());
System.out.println("DPS: " + (1000000000.0 * ids.size() / (endTime - startTime)));
client.admin().indices().prepareRefresh(INDEX_NAME).get();
long documentCount = client.prepareCount(INDEX_NAME).get().getCount();
System.out.println("ES document _count = " + documentCount);
System.out.println("Checking documents");
long notFound = 0;
for (String idVal : ids) {
GetResponse getFields = client.prepareGet(INDEX_NAME, "dummy", idVal).get();
if (!getFields.isExists()) {
notFound++;
}
}
System.out.println("Documents Lost: " + notFound);
client.admin().indices().prepareRefresh(INDEX_NAME).get();
System.out.println("ES document _count = " + client.prepareCount(INDEX_NAME).get().getCount());
}
private void setup() {
Settings settings = Settings.settingsBuilder()
.put("node.client", true)
.put("client.transport.ignore_cluster_name", true)
.put("path.home", "/tmp/es_tmp")
.build();
client = TransportClient.builder().settings(settings).build();
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("localhost", 9300)));
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("localhost", 9301)));
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("localhost", 9302)));
}
private static void retry(int x, Consumer<Integer> runnable) {
while (true) {
try {
runnable.accept(x);
break;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Retrying bulk ..." + x);
}
}
}
private static final String DOCUMENT = "{\"data\": \"Insert some interesting data here\"}";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment