Skip to content

Instantly share code, notes, and snippets.

Created September 16, 2011 09:19
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save anonymous/1221648 to your computer and use it in GitHub Desktop.
ES problem with realtime get
package com.example;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.NodeBuilder;
import org.json.simple.JSONObject;
public class RealTimeChecker {
private static final String INDEX = "myindex";
private static final String TYPE = "type";
public static void main(String[] args) throws InterruptedException {
Settings settings = ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build();
Client client = NodeBuilder.nodeBuilder()
.settings(settings)
.loadConfigSettings(false)
.client(true)
.node()
.client();
client.admin().indices().create(new CreateIndexRequest(INDEX, settings)).actionGet();
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(50);
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS, queue);
executor.prestartAllCoreThreads();
Thread feeder = new Thread(new Feeder(queue, client));
feeder.start();
feeder.join();
}
private static class Feeder implements Runnable {
private final BlockingQueue<Runnable> queue;
private final Client client;
Feeder(BlockingQueue<Runnable> queue, Client client) {
this.queue = queue;
this.client = client;
}
@Override
public void run() {
while (true) {
try {
queue.put(new Checker(client));
} catch (InterruptedException e) {
// Fine, try again
}
}
}
};
private static class Checker implements Runnable {
private final Client client;
Checker(Client client) {
this.client = client;
}
@SuppressWarnings("unchecked")
@Override
public void run() {
Random random = new Random();
JSONObject doc = new JSONObject();
doc.put("age", random.nextInt(122));
doc.put("height", 1.80);
doc.put("text", "Hey look at me I'm being indexed");
String id = String.valueOf(random.nextInt());
client.prepareIndex(INDEX, TYPE, id).setSource(doc.toString()).execute().actionGet();
GetResponse response = client.prepareGet(INDEX, TYPE, id).execute().actionGet();
if (!response.exists()) {
System.out.println("Document with id " + id + " not found");
} else {
System.out.println("ok");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment