Skip to content

Instantly share code, notes, and snippets.

@jpotts
Created January 19, 2015 15:20
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save jpotts/68dbe45824c67d664751 to your computer and use it in GitHub Desktop.
Use a scroll and the bulk api to reindex docs in an Elasticsearch index within the same cluster
public class Reindexer extends BaseElasticsearchUtility {
private static Log logger = LogFactory.getLog(Reindexer.class);
public static void main(String[] args) {
if (args.length != 5) {
doUsage();
System.exit(-1);
}
Reindexer reindexer = new Reindexer();
TransportClient writeClient = reindexer.getClient(args[2], args[0], Integer.parseInt(args[1]));
List<Client> writeClients = new ArrayList<Client>();
writeClients.add(writeClient);
reindexer.setWriteClients(writeClients);
reindexer.reindex(args[3], args[4]);
}
public static void doUsage() {
System.out.println("USAGE:");
System.out.println(" java Reindexer [host] [port] [cluster] [current index] [new index]");
}
/**
* Copy the current index to another index on the same cluster. The method
* will attempt to create the target index. If it already exists, it will
* be logged and the method will continue.
*
* @param currentIndex The current index.
* @param targetIndex The target index.
*/
public void reindex(String currentIndex, String targetIndex) {
logger.info("Reindexing from " + currentIndex + " to " + targetIndex);
try {
IndicesAdminClient admin = getTargetClient().admin().indices();
admin.prepareCreate(targetIndex).execute().actionGet();
} catch (ElasticsearchException ee) {
logger.warn("Tried to create target index, got: " + ee.getMessage());
}
SearchResponse scrollResp = getTargetClient().prepareSearch(currentIndex)
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(60000))
.setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll
//Scroll until no hits are returned
while (true) {
// the first time through there may be zero hits (this is just how scrolls work)
if (scrollResp.getHits().getHits().length > 0) {
BulkRequestBuilder bulkRequest = getTargetClient().prepareBulk();
for (SearchHit hit : scrollResp.getHits()) {
bulkRequest.add(getTargetClient().prepareIndex(targetIndex, hit.getType())
.setSource(getJson(hit))
);
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error("There were failures:");
for (BulkItemResponse response : bulkResponse.getItems()) {
logger.error(response.getFailureMessage());
}
} else {
logger.info("Batch completed successfully");
}
}
scrollResp = getTargetClient().prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
//Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
logger.info("Done reindexing!");
}
/**
* Override this method to transform the object before it is reindexed.
* @param hit
* @return JSON representing the transformed document.
*/
public String getJson(SearchHit hit) {
return hit.getSourceAsString();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment