Created
April 14, 2011 18:56
-
-
Save lmader/920209 to your computer and use it in GitHub Desktop.
Demonstrates a possible concurrency bug in the elasticsearch parent\child feature
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Requires a file called document1.pdf. See the class member declaration below to edit the path. | |
* | |
* Also see comments on the function main(). | |
* | |
* Depends only on the libs that come with elasticsearch: | |
* elasticsearch-0.15.2.jar | |
* lucene*.jar | |
* | |
* Place the ElasticTest.java file and the above jars in the same folder. | |
* | |
* Compile: | |
* javac -cp .:* ElasticTest.java | |
* Run: | |
* java -cp .:* ElasticTest | |
*/ | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.UUID; | |
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; | |
import org.elasticsearch.action.search.SearchRequest; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.action.search.ShardSearchFailure; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.client.action.index.IndexRequestBuilder; | |
import org.elasticsearch.common.Base64; | |
import org.elasticsearch.common.settings.Settings; | |
import org.elasticsearch.common.xcontent.XContentBuilder; | |
import org.elasticsearch.common.xcontent.XContentFactory; | |
import org.elasticsearch.node.Node; | |
import org.elasticsearch.node.NodeBuilder; | |
import org.elasticsearch.search.SearchHit; | |
import org.elasticsearch.transport.RemoteTransportException; | |
public class ElasticTest { | |
private Node elasticNode; | |
// edit the path here if you don't want to place the document in the root of the drive | |
private static final String FILE_NAME = "\\document1.pdf"; | |
private static final String PARENT_TYPE_NAME = "content"; | |
private static final String CHILD_TYPE_NAME = "contentFiles"; | |
private static final String INDEX_NAME = "acme"; | |
/** | |
* Constructor. Initialize elastic and create the index/mapping | |
*/ | |
public ElasticTest() { | |
NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder(); | |
Settings settings = nodeBuilder.settings().put("cluster.name","lmader").build(); | |
this.elasticNode = nodeBuilder.settings(settings).client(true).node(); | |
String mapping = | |
"{\"contentFiles\": {" + | |
"\"_parent\": {" + | |
"\"type\" : \"content\"" + | |
"}," + | |
"\"properties\" : {" + | |
"\"file\" : {" + | |
"\"type\" : \"attachment\"" + | |
"}}}}"; | |
try { | |
this.elasticNode.client().admin().indices().create(new CreateIndexRequest(INDEX_NAME).mapping(CHILD_TYPE_NAME, mapping)).actionGet(); | |
} catch (RemoteTransportException e){ | |
// usually means the index is already created. | |
} | |
} | |
/** | |
* Deletes the item from both the parent and child type locations. | |
*/ | |
public void deleteById(String id) { | |
Client client = this.elasticNode.client(); | |
client.prepareDelete(INDEX_NAME, PARENT_TYPE_NAME, id).execute().actionGet(); | |
client.prepareDelete(INDEX_NAME, CHILD_TYPE_NAME, id).execute().actionGet(); | |
} | |
/** | |
* Index a parent doc | |
*/ | |
public void indexParent(String id, Map<String, Object> objectMap) throws IOException { | |
Client client = this.elasticNode.client(); | |
XContentBuilder builder = XContentFactory.jsonBuilder(); | |
// index content | |
client.prepareIndex(INDEX_NAME, PARENT_TYPE_NAME, id).setSource(builder.map(objectMap)).execute().actionGet(); | |
client.close(); | |
} | |
/** | |
* Index the file as a child doc | |
*/ | |
public void indexFileAsChild(String id, byte[] fileData) throws IOException { | |
Map<String, Object> fileMap = new HashMap<String, Object>(); | |
fileMap.put("file", new String(Base64.encodeBytes(fileData))); | |
Client client = this.elasticNode.client(); | |
XContentBuilder builder = XContentFactory.jsonBuilder(); | |
IndexRequestBuilder indexRequestbuilder = client.prepareIndex(INDEX_NAME, CHILD_TYPE_NAME, id); | |
indexRequestbuilder = indexRequestbuilder.setParent(id); | |
indexRequestbuilder = indexRequestbuilder.setSource(builder.map(fileMap)); | |
indexRequestbuilder.execute().actionGet(); | |
client.close(); | |
} | |
/** | |
* Execute a search based on a JSON String in QueryDSL format. | |
* | |
* Throws a RuntimeException if there are any shard failures to | |
* elevate the visibility of the problem. | |
*/ | |
public List<String> executeSearch(String source) { | |
Client client = this.elasticNode.client(); | |
SearchRequest request = Requests.searchRequest(INDEX_NAME).source(source); | |
List<ShardSearchFailure> failures; | |
SearchResponse response; | |
try { | |
response = client.search(request).actionGet(); | |
failures = Arrays.asList(response.getShardFailures()); | |
// throw an exception so that we see the shard failures | |
if (failures.size() != 0) { | |
throw new RuntimeException(failures.toString()); | |
} | |
ArrayList<String> results = new ArrayList<String>(); | |
if (response != null) { | |
for (SearchHit hit : response.hits()) { | |
String sourceStr = hit.sourceAsString(); | |
results.add(sourceStr); | |
} | |
} | |
return results; | |
} finally { | |
client.close(); | |
} | |
} | |
/** | |
* Load a file from disk. | |
*/ | |
public byte[] loadFile(String fileName) throws IOException { | |
FileInputStream in = null; | |
try { | |
File file = new File(fileName); | |
in = new FileInputStream(file); | |
long length = file.length(); | |
byte[] fileData = new byte[(int) length]; | |
int count = in.read(fileData); | |
if (count != length) { | |
throw new RuntimeException("Error reading file"); | |
} | |
return fileData; | |
} finally { | |
if (in != null) { | |
in.close(); | |
} | |
} | |
} | |
/** | |
* Create a document as a parent and index it. | |
* Load a file and index it as a child. | |
*/ | |
public String indexDoc() throws IOException { | |
String id = UUID.randomUUID().toString(); | |
byte[] bytes = loadFile(FILE_NAME); | |
if (bytes == null) { | |
throw new RuntimeException("fileData is null"); | |
} | |
Map<String, Object> objectMap = new HashMap<String, Object>(); | |
objectMap.put("title", "this is a document"); | |
this.indexParent(id, objectMap); | |
this.indexFileAsChild(id, bytes); | |
return id; | |
} | |
/** | |
* Perform the has_child query for the doc. | |
* | |
* Since it might take time to get indexed, it | |
* loops until it finds the doc. | |
*/ | |
public void searchDocByChild() throws InterruptedException { | |
String dslString = | |
"{\"query\":{" + | |
"\"has_child\":{" + | |
"\"query\":{" + | |
"\"field\":{" + | |
"\"file\":\"Mission statement\"}}," + | |
"\"type\":\"contentFiles\"}}}"; | |
int numTries = 0; | |
List<String> items = new ArrayList<String>(); | |
while (items.size() != 1 && numTries < 20) { | |
items = executeSearch(dslString); | |
numTries++; | |
if (items.size() != 1) { | |
Thread.sleep(250); | |
} | |
} | |
if (items.size() != 1) { | |
System.out.println("Exceeded number of retries"); | |
System.exit(1); | |
} | |
} | |
/** | |
* Program to loop on: | |
* create parent/child doc | |
* search for the doc | |
* delete the doc | |
* repeat the above until shard failure. | |
* | |
* Eventually fails with: | |
* | |
* [shard [[74wz0lrXRSmSOsJOqgPvlw][acme][1]], reason [RemoteTransportException | |
* [[Kismet][inet[/10.10.30.52:9300]][search/phase/query]]; nested: | |
* QueryPhaseExecutionException[[acme][1]: | |
* query[ConstantScore(child_filter[contentFiles | |
* /content](filtered(file:mission | |
* file:statement)->FilterCacheFilterWrapper( | |
* _type:contentFiles)))],from[0],size[10]: Query Failed [Failed to execute | |
* child query [filtered(file:mission | |
* file:statement)->FilterCacheFilterWrapper(_type:contentFiles)]]]; nested: | |
* ]] | |
* | |
* @param args | |
*/ | |
public static void main(String[] args) { | |
try { | |
ElasticTest elasticTest = new ElasticTest(); | |
// loop a bunch of times - usually fails before the count is done. | |
int NUM_LOOPS = 500; | |
System.out.println(); | |
System.out.println("Looping [" + NUM_LOOPS + "] times:"); | |
System.out.println(); | |
for (int i = 0; i < NUM_LOOPS; i++) { | |
String id = elasticTest.indexDoc(); | |
elasticTest.searchDocByChild(); | |
elasticTest.deleteById(id); | |
System.out.println(" Success: " + i); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment