Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@karussell
Created February 9, 2011 20:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save karussell/819239 to your computer and use it in GitHub Desktop.
Save karussell/819239 to your computer and use it in GitHub Desktop.
merge indices
// this GIST is used for this ticket: https://gist.github.com/819239
// SOLVED when sort against id !
package org.elasticsearch.test.integration.search.scroll;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.xcontent.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class SearchScrollTests extends AbstractNodesTests {
private Client client;
@BeforeClass public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
private Logger logger = LoggerFactory.getLogger(getClass());
private String indexType = "tweet";
private String NUM = "num";
@Test public void testScrollWithPreUpdate() throws Exception {
String index1 = "test";
String resindex = "resindex";
try {
client.admin().indices().prepareDelete(index1).execute().actionGet();
client.admin().indices().prepareDelete(resindex).execute().actionGet();
} catch (Exception e) {
// ignore
}
// create indices
createIndex(index1);
createIndex(resindex);
waitForGreen();
waitForGreen();
boolean showStrangeBehaviour = true;
boolean test1 = true;
if (showStrangeBehaviour) {
if (test1)
// will result in "collected:185" BUT should be 200
feedDoc(index1, createDoc(2), "0");
else
// collected:169 BUT should be 200
feedDoc(index1, createDoc(2), "199");
} else {
if (test1)
// collected:201 is okay
feedDoc(index1, createDoc(2), "-1");
else
// collected:201 is okay
feedDoc(index1, createDoc(2), "200");
}
// Thread.sleep(10000);
// create some equal content from 0..199
Map<String, XContentBuilder> map = new LinkedHashMap<String, XContentBuilder>();
for (int i = 0; i < 200; i++) {
map.put("" + i, createDoc(i));
}
bulkUpdate(map, index1);
// Thread.sleep(10000);
System.out.println("index1:" + countAll(index1) + " resindex:" + countAll(resindex));
mergeIndices(Arrays.asList(index1), resindex, 2);
logger.info("200? " + countAll(resindex));
assertThat(countAll(resindex), equalTo(200l));
//node.stop();
}
public void createIndex(String indexName) {
try {
client.admin().indices().prepareCreate(indexName).setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2)).execute().actionGet();
//client.admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
} catch (Exception ex) {
logger.info(indexName + " already exists!", ex);
}
}
public void waitForGreen() {
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
//client.admin().cluster().health(new ClusterHealthRequest(indexName).waitForYellowStatus()).actionGet();
}
public void refresh(String indexName) {
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
}
public void refresh(String... indices) {
client.admin().indices().refresh(new RefreshRequest(indices)).actionGet();
}
public long countAll(String indexName) {
CountResponse response = client.prepareCount(indexName).
setQuery(QueryBuilders.matchAllQuery()).
execute().actionGet();
return response.getCount();
}
public XContentBuilder createDoc(int num) throws IOException {
XContentBuilder docBuilder = JsonXContent.unCachedContentBuilder().startObject();
docBuilder.field(NUM, num);
docBuilder.endObject();
return docBuilder;
}
public void bulkUpdate(Map<String, XContentBuilder> docs, String indexName) {
// now using bulk API instead of feeding each doc separate with feedDoc
BulkRequestBuilder brb = client.prepareBulk();
for (Map.Entry<String, XContentBuilder> e : docs.entrySet()) {
brb.add(Requests.indexRequest(indexName).type(indexType).id(e.getKey()).source(e.getValue()));
brb.setRefresh(true);
}
if (brb.numberOfActions() > 0) {
BulkResponse rsp = brb.execute().actionGet();
//System.out.println(rsp.items().length);
if (rsp.hasFailures())
System.out.println("Error while bulkUpdate:" + rsp.buildFailureMessage());
}
}
public void mergeIndices(Collection<String> indexList, String intoIndex, int hitsPerPage) {
refresh(indexList.toArray(new String[0]));
refresh(intoIndex);
for (String fromIndex : indexList) {
//String fromIndex = "";
SearchResponse rsp = client.prepareSearch(fromIndex).
setQuery(QueryBuilders.matchAllQuery()).setSize(hitsPerPage).
// THEN_FETCH is even worse
setSearchType(SearchType.QUERY_AND_FETCH).
setScroll(TimeValue.timeValueMinutes(30)).execute().actionGet();
try {
long total = rsp.hits().totalHits();
int collectedResults = 0;
do {
Map<String, XContentBuilder> docs = collectDocs(rsp);
bulkUpdate(docs, intoIndex);
collectedResults += rsp.hits().hits().length;;
rsp = client.prepareSearchScroll(rsp.scrollId()).
setScroll(TimeValue.timeValueMinutes(30)).execute().actionGet();
//logger.info("Progress " + collectedResults + "/" + total + " fromIndex=" + fromIndex);
} while (rsp.hits().hits().length > 0);
//logger.info("Finished copying of index " + fromIndex + ". Total:" + total + " collected:" + collectedResults);
} catch (Exception ex) {
logger.error("Failed to copy data from index " + fromIndex + " into " + intoIndex + ".", ex);
}
}
refresh(intoIndex);
}
public Map<String, XContentBuilder> collectDocs(SearchResponse rsp) throws IOException {
SearchHits docs = rsp.getHits();
Map<String, XContentBuilder> list = new LinkedHashMap<String, XContentBuilder>();
for (SearchHit sd : docs) {
int num = (Integer) sd.getSource().get(NUM);
list.put(sd.getId(), createDoc(num));
}
return list;
}
public void feedDoc(String indexName, XContentBuilder b, String id) {
IndexRequestBuilder irb = client.prepareIndex(indexName, indexType, id).
setConsistencyLevel(WriteConsistencyLevel.DEFAULT).setRefresh(true).
setSource(b);
irb.execute().actionGet();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment