Skip to content

Instantly share code, notes, and snippets.

@lmader
Created April 14, 2011 18:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lmader/920209 to your computer and use it in GitHub Desktop.
Save lmader/920209 to your computer and use it in GitHub Desktop.
Demonstrates a possible concurrency bug in the elasticsearch parent\child feature
/**
* Requires a file called document1.pdf. See the class member declaration below to edit the path.
*
* Also see comments on the function main().
*
* Depends only on the libs that come with elasticsearch:
* elasticsearch-0.15.2.jar
* lucene*.jar
*
* Place the ElasticTest.java file and the above jars in the same folder.
*
* Compile:
* javac -cp .:* ElasticTest.java
* Run:
* java -cp .:* ElasticTest
*/
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.index.IndexRequestBuilder;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.RemoteTransportException;
public class ElasticTest {
private Node elasticNode;
// edit the path here if you don't want to place the document in the root of the drive
private static final String FILE_NAME = "\\document1.pdf";
private static final String PARENT_TYPE_NAME = "content";
private static final String CHILD_TYPE_NAME = "contentFiles";
private static final String INDEX_NAME = "acme";
/**
* Constructor. Initialize elastic and create the index/mapping
*/
public ElasticTest() {
NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder();
Settings settings = nodeBuilder.settings().put("cluster.name","lmader").build();
this.elasticNode = nodeBuilder.settings(settings).client(true).node();
String mapping =
"{\"contentFiles\": {" +
"\"_parent\": {" +
"\"type\" : \"content\"" +
"}," +
"\"properties\" : {" +
"\"file\" : {" +
"\"type\" : \"attachment\"" +
"}}}}";
try {
this.elasticNode.client().admin().indices().create(new CreateIndexRequest(INDEX_NAME).mapping(CHILD_TYPE_NAME, mapping)).actionGet();
} catch (RemoteTransportException e){
// usually means the index is already created.
}
}
/**
* Deletes the item from both the parent and child type locations.
*/
public void deleteById(String id) {
Client client = this.elasticNode.client();
client.prepareDelete(INDEX_NAME, PARENT_TYPE_NAME, id).execute().actionGet();
client.prepareDelete(INDEX_NAME, CHILD_TYPE_NAME, id).execute().actionGet();
}
/**
* Index a parent doc
*/
public void indexParent(String id, Map<String, Object> objectMap) throws IOException {
Client client = this.elasticNode.client();
XContentBuilder builder = XContentFactory.jsonBuilder();
// index content
client.prepareIndex(INDEX_NAME, PARENT_TYPE_NAME, id).setSource(builder.map(objectMap)).execute().actionGet();
client.close();
}
/**
* Index the file as a child doc
*/
public void indexFileAsChild(String id, byte[] fileData) throws IOException {
Map<String, Object> fileMap = new HashMap<String, Object>();
fileMap.put("file", new String(Base64.encodeBytes(fileData)));
Client client = this.elasticNode.client();
XContentBuilder builder = XContentFactory.jsonBuilder();
IndexRequestBuilder indexRequestbuilder = client.prepareIndex(INDEX_NAME, CHILD_TYPE_NAME, id);
indexRequestbuilder = indexRequestbuilder.setParent(id);
indexRequestbuilder = indexRequestbuilder.setSource(builder.map(fileMap));
indexRequestbuilder.execute().actionGet();
client.close();
}
/**
* Execute a search based on a JSON String in QueryDSL format.
*
* Throws a RuntimeException if there are any shard failures to
* elevate the visibility of the problem.
*/
public List<String> executeSearch(String source) {
Client client = this.elasticNode.client();
SearchRequest request = Requests.searchRequest(INDEX_NAME).source(source);
List<ShardSearchFailure> failures;
SearchResponse response;
try {
response = client.search(request).actionGet();
failures = Arrays.asList(response.getShardFailures());
// throw an exception so that we see the shard failures
if (failures.size() != 0) {
throw new RuntimeException(failures.toString());
}
ArrayList<String> results = new ArrayList<String>();
if (response != null) {
for (SearchHit hit : response.hits()) {
String sourceStr = hit.sourceAsString();
results.add(sourceStr);
}
}
return results;
} finally {
client.close();
}
}
/**
* Load a file from disk.
*/
public byte[] loadFile(String fileName) throws IOException {
FileInputStream in = null;
try {
File file = new File(fileName);
in = new FileInputStream(file);
long length = file.length();
byte[] fileData = new byte[(int) length];
int count = in.read(fileData);
if (count != length) {
throw new RuntimeException("Error reading file");
}
return fileData;
} finally {
if (in != null) {
in.close();
}
}
}
/**
* Create a document as a parent and index it.
* Load a file and index it as a child.
*/
public String indexDoc() throws IOException {
String id = UUID.randomUUID().toString();
byte[] bytes = loadFile(FILE_NAME);
if (bytes == null) {
throw new RuntimeException("fileData is null");
}
Map<String, Object> objectMap = new HashMap<String, Object>();
objectMap.put("title", "this is a document");
this.indexParent(id, objectMap);
this.indexFileAsChild(id, bytes);
return id;
}
/**
* Perform the has_child query for the doc.
*
* Since it might take time to get indexed, it
* loops until it finds the doc.
*/
public void searchDocByChild() throws InterruptedException {
String dslString =
"{\"query\":{" +
"\"has_child\":{" +
"\"query\":{" +
"\"field\":{" +
"\"file\":\"Mission statement\"}}," +
"\"type\":\"contentFiles\"}}}";
int numTries = 0;
List<String> items = new ArrayList<String>();
while (items.size() != 1 && numTries < 20) {
items = executeSearch(dslString);
numTries++;
if (items.size() != 1) {
Thread.sleep(250);
}
}
if (items.size() != 1) {
System.out.println("Exceeded number of retries");
System.exit(1);
}
}
/**
* Program to loop on:
* create parent/child doc
* search for the doc
* delete the doc
* repeat the above until shard failure.
*
* Eventually fails with:
*
* [shard [[74wz0lrXRSmSOsJOqgPvlw][acme][1]], reason [RemoteTransportException
* [[Kismet][inet[/10.10.30.52:9300]][search/phase/query]]; nested:
* QueryPhaseExecutionException[[acme][1]:
* query[ConstantScore(child_filter[contentFiles
* /content](filtered(file:mission
* file:statement)->FilterCacheFilterWrapper(
* _type:contentFiles)))],from[0],size[10]: Query Failed [Failed to execute
* child query [filtered(file:mission
* file:statement)->FilterCacheFilterWrapper(_type:contentFiles)]]]; nested:
* ]]
*
* @param args
*/
public static void main(String[] args) {
try {
ElasticTest elasticTest = new ElasticTest();
// loop a bunch of times - usually fails before the count is done.
int NUM_LOOPS = 500;
System.out.println();
System.out.println("Looping [" + NUM_LOOPS + "] times:");
System.out.println();
for (int i = 0; i < NUM_LOOPS; i++) {
String id = elasticTest.indexDoc();
elasticTest.searchDocByChild();
elasticTest.deleteById(id);
System.out.println(" Success: " + i);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment