Skip to content

Instantly share code, notes, and snippets.

@dav-rob
Created October 19, 2011 09:52
Show Gist options
  • Save dav-rob/1297867 to your computer and use it in GitHub Desktop.
Save dav-rob/1297867 to your computer and use it in GitHub Desktop.
ElasticSearch Bulk Indexing from JDBC Input
package ****.elasticsearch.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
import org.elasticsearch.client.action.get.GetRequestBuilder;
import org.elasticsearch.client.action.search.SearchRequestBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.springframework.beans.factory.annotation.Autowired;
import ****.index.bean.JsonBean;
import ****.index.compass.IIndexing;
import ****.index.jdbc.BuildIndex;
import ****.util.CSUtils;
public abstract class BaseESIndexer implements IIndexing<JsonBean>{
private final static Log LOG = LogFactory.getLog(BaseESIndexer.class);
@Autowired
protected ESNodeWrapper esNode;
protected String indexName = null;
protected int bulkSize = 100;
protected int dropThreshold = 10;
protected final AtomicInteger onGoingBulks = new AtomicInteger();
protected int bulkOperations = 0;
/**
* This method should be used to initialize the Indexer once the ESNodeWrapper has been added.
* @throws IOException
*/
protected void init() {
// new indexes should only be created by (nightly, weekly) indexing processes
/* try {
OpenIndexResponse resp = esNode.getClient()
.admin()
.indices().prepareOpen(indexName).execute().actionGet();
} catch (ElasticSearchException e) {
LOG.info("Error opening Index: " + indexName + ", will now create that index.");
e.printStackTrace();
// create index if one is not already present.
ESAdmin.createIndex(esNode, indexName);
} */
}
public void clear() {
DeleteByQueryResponse deleteResp = esNode.getClient().prepareDeleteByQuery(indexName)
.setQuery(new MatchAllQueryBuilder())
.execute()
.actionGet();
}
public Map load(Integer contactId) {
String queryStr = "id:" + contactId;
QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder(queryStr);
SearchRequestBuilder searchReq = esNode.getClient().prepareSearch().setIndices(this.getIndexName())
.setQuery(queryBuilder);
SearchResponse searchResponse = searchReq.execute().actionGet();
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits == null || hits.length == 0){
return null;
}
Map<String, Object> result = hits[0].getSource();
return result;
}
public void remove(int id) {
LOG.info("Starting to remove id[" + id + "]");
String queryStr = "id:" + id;
QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder(queryStr);
DeleteByQueryResponse deleteResp = esNode.getClient().prepareDeleteByQuery(indexName)
.setQuery(queryBuilder)
.execute().actionGet();
Map<String, IndexDeleteByQueryResponse> indexMap = deleteResp.indices();
if (indexMap == null){
return;
}
for (String index : indexMap.keySet()){
IndexDeleteByQueryResponse resp = indexMap.get(index);
if (resp.failedShards() > 0){
LOG.warn("Error deleting id[" + id +"]");
}
}
LOG.info("Finished removing id[" + id + "]");
}
public void refresh() {
RefreshRequest request = new RefreshRequest(indexName);
esNode.getClient().admin().indices().refresh(request ).actionGet();
}
/**
* This should only be used for tens or at most hundreds of deletes.
* For more than this use removeBatchBulk.
*/
public void removeBatch(Collection<Integer> ids) {
for (int id : ids){
Map indexObject = load(id);
if (indexObject != null){
remove(id);
}
}
}
public void removeBatchBulk(Collection<Integer> ids) {
LOG.info("Starting removeBatch of " + ids.size() + " objects on " + getIndexName());
BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
for (int id : ids){
Map indexObject = load(id);
if (indexObject != null){
String type = null;// ????? where to get this in a delete operation !!!
Boolean createNew = bulkDelete(id, bulkOperation, type);
if (createNew){
bulkOperation = esNode.getClient().prepareBulk();
}
}
}
finishBulk(bulkOperation, "removeBatch");
LOG.info("Finished removeBatch of " + ids.size() + " objects on " + getIndexName());
}
public void saveBatch(Collection<JsonBean> beans) {
LOG.info("Starting saveBatch Indexing " + beans.size() + " " + getIndexName() + ", documents.");
BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
for (JsonBean jsonBean: beans){
Boolean createNew = bulkSave(jsonBean, bulkOperation);
if (createNew){
bulkOperation = esNode.getClient().prepareBulk();
}
}
// index the stragglers remaining
finishBulk(bulkOperation, "saveBatch");
LOG.info("Finished saveBatch indexing threads " + beans.size() + " " + getIndexName() + ", documents.");
}
private void finishBulk(BulkRequestBuilder bulkOperation, String desc) {
int oldBulkSize = bulkSize;
bulkSize = 1;
LOG.info(desc + " Index remaining " + bulkOperation.numberOfActions() + " " + getIndexName() + ", operations.");
processBulkIfNeeded(bulkOperation, bulkOperation.numberOfActions() + bulkOperations);
bulkSize = oldBulkSize;
}
public void save(JsonBean party) {
IndexResponse postResponse = esNode.getClient().prepareIndex()
.setIndex(indexName)
.setType(party.getIndexType())
.setId(""+party.getId())
.setSource(party.getJson())
.setRefresh(true)
.execute().actionGet();
}
public void saveOrUpdateBatch(Collection<JsonBean> parties) {
LOG.info("Starting saveOrUpdateBatch Indexing of" + parties.size() + " " + getIndexName() + ", documents.");
BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
for (JsonBean jsonBean: parties){
GetResponse getResponse = esNode.getClient()
.prepareGet(jsonBean.getIndexName(), jsonBean.getIndexType(), "" + jsonBean.getId())
.execute().actionGet();
if (getResponse != null){
Boolean createNew = bulkDelete(jsonBean, bulkOperation);
if (createNew){
bulkOperation = esNode.getClient().prepareBulk();
}
}
Boolean createNew = bulkSave(jsonBean, bulkOperation);
if (createNew){
bulkOperation = esNode.getClient().prepareBulk();
}
}
// index the stragglers remaining
int oldBulkSize = bulkSize;
bulkSize = 1;
LOG.info("saveOrUpdateBatch Index remaining " + bulkOperation.numberOfActions() + " " + getIndexName() + ", operations.");
processBulkIfNeeded(bulkOperation, bulkOperation.numberOfActions() + bulkOperations);
bulkSize = oldBulkSize;
LOG.info("Finished starting indexing threads " + parties.size() + " " + getIndexName() + ", documents.");
}
public void saveOrUpdate(JsonBean party) {
/*CompassSession cmpSess = null;
CompassTransaction tx = null;
try
{
cmpSess = compass.openSession();
tx = cmpSess.beginTransaction();
try{
cmpSess.load("accountindex", party.getParty_id());
cmpSess.delete("accountindex", party.getParty_id());
}catch(CompassException ignore){}
cmpSess.save((AccountIndex)party);
tx.commit();
} catch (CompassException ce) {
handleException(tx, ce);
}finally{
cmpSess.close();
}*/
}
/*@Override
public CompassHits search(String query){
CompassSession cmpSess = null;
CompassHits hits = null;
if (query!=null){
try
{
cmpSess = compass.openSession();
hits = cmpSess.find(query);
} catch (CompassException ce) {
LOG.error("searchExtParty raises exception: ", ce);
}finally{
cmpSess.close();
}
}
return hits;
}*/
public List<JsonBean> searchList(String query){
List<JsonBean> results = new ArrayList<JsonBean>();
/*CompassSession cmpSess = null;
if (query!=null){
try
{
cmpSess = compass.openSession();
CompassHits hits = cmpSess.find(query);
for (int i = 0; i < hits.getLength(); i++) {
results.add((AccountIndex) hits.data(i));
}
} catch (CompassException ce) {
LOG.error("searchIoi raises exception: ", ce);
}finally{
cmpSess.close();
}
}*/
return results;
}
public List<JsonBean> loadAllList() {
/*ArrayList<String> results = new ArrayList<String>();
SearchResponse response = esNode.getClient().prepareSearch(indexName).setSearchType(indexType).execute().actionGet();
SearchHits hitsObj = response.getHits();
return hitsObj;*/
throw new UnsupportedOperationException("We don't want to return millions of results in an array list should query through SearchHits API");
}
/*@Override
public CompassHits loadAll() {
CompassHits results = search("alias:accountindex");
return results;
}*/
public void CUDParties(List<Map<String, Object>> contactsLst) {
// TODO Auto-generated method stub
}
public Boolean bulkDelete(Integer id, BulkRequestBuilder bulkOperation, String type) {
DeleteRequestBuilder del = esNode.getClient().prepareDelete().setIndex(getIndexName()).setId(id.toString()).setType(type);
bulkOperation.add(del);
Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
return createNewBulkOperation;
}
public Boolean bulkDelete(JsonBean jsonBean, BulkRequestBuilder bulkOperation) {
DeleteRequestBuilder del = esNode.getClient().prepareDelete(indexName, jsonBean.getIndexType(), "" + jsonBean.getId());
bulkOperation.add(del);
Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
return createNewBulkOperation;
}
public Boolean bulkSave(JsonBean jsonBean, BulkRequestBuilder bulkOperation) {
IndexRequest indexOperation = Requests.indexRequest(indexName)
.type(jsonBean.getIndexType())
.id("" + jsonBean.getId())
.create(false)
.source(jsonBean.getJson());
bulkOperation.add(indexOperation);
Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
return createNewBulkOperation;
}
private Boolean processBulkIfNeeded(BulkRequestBuilder bulkOperation, int bulkOperations) {
Boolean createNewBulkReq = false;
if (bulkOperation.numberOfActions() >= bulkSize) {
// execute the bulk operation
int currentOnGoingBulks = onGoingBulks.incrementAndGet();
LOG.info("Ongoing Bulks = " + currentOnGoingBulks + ", Index Operations = " + bulkOperations );
if (currentOnGoingBulks > dropThreshold) {
// TODO, just wait here!, we can slow down the parsing
onGoingBulks.decrementAndGet();
String message = "dropping bulk, " + onGoingBulks + " crossed threshold " + dropThreshold + ", Index Operations = " + bulkOperations;
LOG.error(message);
registerError(new Exception(message));
} else {
try {
final int bulkNo = onGoingBulks.get();
LOG.info("Executing Bulk Request " + bulkNo + " for index " + getIndexName());
bulkOperation.execute(new ActionListener<BulkResponse>() {
@Override public void onResponse(BulkResponse bulkResponse) {
onGoingBulks.decrementAndGet();
LOG.info("Bulk [" + bulkNo + "] of " + getIndexName() + " Index Executed");
if (bulkResponse.hasFailures()){
RuntimeException indexingException = new RuntimeException(bulkResponse.buildFailureMessage());
registerError(indexingException);
}
}
@Override public void onFailure(Throwable indexingException) {
Exception exception = new Exception(indexingException);
registerError(exception);
}
});
} catch (Exception indexingException) {
registerError(indexingException);
}
}
// once we have executed a bulk request, create a new one for adding
// a fresh set of bulk updates, deletes, additions etc.
createNewBulkReq = true;
}
return createNewBulkReq;
}
private void registerError(Exception indexingException) {
BuildIndex.statusFlag = BuildIndex.ERROR;
BuildIndex.exception = indexingException;
LOG.error("Indexing Exception", indexingException);
}
public String getIndexName() {
return indexName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public int getBulkSize() {
return bulkSize;
}
public void setBulkSize(int bulkSize) {
this.bulkSize = bulkSize;
}
public int getDropThreshold() {
return dropThreshold;
}
public void setDropThreshold(int dropThreshold) {
this.dropThreshold = dropThreshold;
}
public AtomicInteger getOnGoingBulks() {
return onGoingBulks;
}
public ESNodeWrapper getEsNode() {
return esNode;
}
}
====================================================================================================================
package ****.index.bean;
public class JsonBean {
private String json;
private int id;
private String indexName;
private String indexType;
public JsonBean(){}
public JsonBean(String json, int id, String indexName, String indexType) {
this.json = json;
this.id = id;
this.indexName = indexName;
this.indexType = indexType;
}
public String getJson() {
return json;
}
public void setJson(String json) {
this.json = json;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getIndexName() {
return indexName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public String getIndexType() {
return indexType;
}
public void setIndexType(String indexType) {
this.indexType = indexType;
}
}
===============================================================================================================================
package ****.elasticsearch.buildIndex;
import static ****.elasticsearch.util.ESConstants.ALIAS_CONTACT;
import static ****.elasticsearch.util.ESConstants.INDEX_CONTACT;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import ****.elasticsearch.index.ESContactIndexer;
import ****.elasticsearch.index.IndexCleanup;
import ****.elasticsearch.util.ESAdmin;
import ****.index.bean.Contact;
import ****.index.bean.ContactJSONWrapper;
import ****.index.bean.JsonBean;
import ****.index.compass.QueuedIndexingThread;
import ****.index.jdbc.BaseContactJDBCIndexer;
import ****.index.jdbc.BuildIndex;
import ****.index.jdbc.IntegerRowMapper;
import ****.index.jdbc.JDBCIndexer;
import ****.index.jdbc.JDBCQueries;
import ****.index.jdbc.JDBCQueryHelper;
import ****.util.CSUtils;
public class ESContactJDBCIndexer extends BaseContactJDBCIndexer implements JDBCIndexer, ElasticSearchIndexer{ //JdbcDaoSupport{
final static Log LOG = LogFactory.getLog(ESContactJDBCIndexer.class);
QueuedIndexingThread<JsonBean> indexThread = new QueuedIndexingThread<JsonBean>("ESContact Indexer");
@Autowired(required=true) ESContactIndexer esContactIndexing;
public void init() {
}
public void doIndex() throws Exception {
IndexCleanup.cleanup(esContactIndexing.getEsNode(), "Contact JDBC");
Thread.sleep(1000);
String oldIndexName = getOldIndexName();
LOG.info("Old Index Name[" + oldIndexName + "]");
String newIndexName = createNewIndex();
LOG.info("New Index Name[" + newIndexName + "]");
esContactIndexing.setIndexName(newIndexName);
// This is the big data one
getDBDataAndCreateNewIndex();
LOG.info("Removing alias [" + ALIAS_CONTACT + "] for [" + oldIndexName + "]");
removeAliasOldIndex(oldIndexName, ALIAS_CONTACT);
LOG.info("Adding alias [" + ALIAS_CONTACT + "] for [" + newIndexName + "]");
aliasNewIndex(newIndexName, ALIAS_CONTACT);
//?????? runWarmUpQueries ????? // Lucene 3.0.3 (I think) needs one-off warm up queries.
Thread.sleep(1000);
LOG.info("Deleting index [" + oldIndexName + "]");
deleteOldIndex(oldIndexName);
}
@Override
public void removeAliasOldIndex(String oldIndexName, String aliasContact) {
ESAdmin.removeAlias(esContactIndexing.getEsNode(), oldIndexName, aliasContact);
}
@Override
public void aliasNewIndex(String newIndexName, String aliasContact) {
ESAdmin.addAlias(esContactIndexing.getEsNode(), newIndexName, aliasContact);
}
@Override
public void deleteOldIndex(String oldIndexName) {
ESAdmin.deleteIndex(esContactIndexing.getEsNode(), oldIndexName);
}
@Override
public String createNewIndex() {
return EsJdbcHelper.createNewIndexOfType(INDEX_CONTACT, esContactIndexing.getEsNode());
}
@Override
public String getOldIndexName() {
return EsJdbcHelper.getOldIndexNameOfType(INDEX_CONTACT, esContactIndexing.getEsNode());
}
@Override
public void getDBDataAndCreateNewIndex() throws Exception, InterruptedException {
List<Integer> allPersonPartyIds = getPersonIds();
int size = allPersonPartyIds.size();
if(size==0) return;
indexThread.setIndexingTools(esContactIndexing);
indexThread.setStopWhenEmpty(false);
indexThread.setUpdateMode(false);
LOG.info("*** Number of contacts to Index = " + size);
// set chunk size to give about 5000 documents per chunk
int numberOfChunks = size / 5000;
LOG.info("numberOfChunks = " + numberOfChunks);
List<JDBCQueryHelper.HiLowId> idBetweenList = JDBCQueryHelper.createBetweenIdList(allPersonPartyIds, numberOfChunks);
int count = 0;
for (JDBCQueryHelper.HiLowId hiLo: idBetweenList){
if (BuildIndex.statusFlag == BuildIndex.ERROR){
LOG.error("Error In Indexbuilding, stopping now.", BuildIndex.exception);
return;
}
Collection<Contact> parties = mapContacts(hiLo);
indexCollection(parties);
ESAdmin.getStats(esContactIndexing.getEsNode(), LOG);
LOG.info("Pass number " + ++count + " executed.");
}
Thread.sleep(2000);
indexThread.setStopWhenEmpty(true);
try {
Thread.sleep(5000);
while (indexThread.isStillRunning()){
Thread.sleep(5000);
}
} catch (InterruptedException e) {}
}
public List<Integer> getPersonIds() {
List <Integer> ids = getJdbcTemplate().query(
JDBCQueries.GET_ALL_PERSON_IDS_PROC, new IntegerRowMapper());
return ids;
}
public void setExtPartyIndexing(ESContactIndexer contactIndexing) {
this.esContactIndexing = contactIndexing;
}
public void doIndexLimited(String contactIds) throws Exception {
esContactIndexing.setIndexName(ALIAS_CONTACT);
LOG.info("Contact Ids[" + contactIds + "]");
if(contactIds==null || "".equals(contactIds)) {
LOG.warn("Empty contactIds!! Returning now.");
return;
}
Collection<Contact> parties = mapContactsLimited(contactIds);
List<JsonBean> jsonList = getCollectionAsJsonBeanList(parties);
esContactIndexing.saveOrUpdateBatch(jsonList);
// refresh the index before returning, we do it TWICE deliberately
esContactIndexing.refresh();
esContactIndexing.refresh();
}
void indexCollection(Collection<Contact> parties) throws Exception {
List<JsonBean> jsonList = getCollectionAsJsonBeanList(parties);
LOG.info("created " + jsonList.size() + " contact json beans.");
indexJSONCollection(jsonList);
}
private List<JsonBean> getCollectionAsJsonBeanList(
Collection<Contact> parties) throws IOException,
JsonGenerationException, JsonMappingException {
List<JsonBean> jsonList = new ArrayList<JsonBean>(parties.size());
int i = 0;
for (Contact contact : parties){
ContactJSONWrapper wrapperForLessVerboseJson = new ContactJSONWrapper(contact);
String json1 = CSUtils.mapper().writeValueAsString(wrapperForLessVerboseJson);
if (json1 != null){
if (i == 0){
LOG.info("contact json = " + json1);
i++;
}
JsonBean jsonBean = new JsonBean(json1, contact.getPartyId(), esContactIndexing.getIndexName() , contact.getType());
jsonList.add(jsonBean);
}
}
return jsonList;
}
private void indexJSONCollection(Collection<JsonBean> parties) {
indexThread.addToIndexQueAndIncrementBatchNumber(parties);
if (!indexThread.isStarted()){
indexThread.start();
}
}
}
==============================================================================================================================
package ****.index.compass;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class QueuedIndexingThread<T> extends Thread {
private String description;
/**
* Use this if you know how many batches you need to index at instatiation
* time. - use "addToIndexQueAndIncrementBatchNumber" when adding 'chunks'
* to index. - do not use "addToIndexQue"
*
* - also this way seems to be slower than using predefined chunk size,
* pehaps because we are changing variable values in the middle of a thread
* causing more context switching.
*
* @param numberOfBatchesToIndex
*/
public QueuedIndexingThread(String description) {
this.numberOfBatchesToIndex = 0;
this.isPreConfiguredChunkSize = false;
this.description = description;
}
/**
* Use this if you know how many batches you need to index at instatiation
* time. - use "addToIndexQue" when adding 'chunks' to index. - do not use
* not "addToIndexQueAndIncrementBatchNumber"
*
* @param numberOfBatchesToIndex
*/
public QueuedIndexingThread(int numberOfBatchesToIndex, String description) {
this.numberOfBatchesToIndex = numberOfBatchesToIndex;
this.isPreConfiguredChunkSize = true;
this.description = description;
}
private final static Log LOG = LogFactory.getLog(QueuedIndexingThread.class);
private IIndexing<T> indexing;
int numberOfBatchesToIndex = 0;
private boolean isStarted = false;
private boolean doNotStop = true;
private boolean stopWhenEmpty = true;
private Boolean isPreConfiguredChunkSize;
private Boolean updateMode;
private int countIndexesFinished;
public Boolean getUpdateMode() {
return updateMode;
}
public void setUpdateMode(Boolean updateMode) {
this.updateMode = updateMode;
}
private LinkedList<Collection<T>> que = new LinkedList<Collection<T>>();
@Override
public void run() {
isStarted = true;
LOG.info("Starting Indexing" + description);
while (doNotStop) {
try {
Collection<T> indexAble = que.poll();
if ( indexAble != null) {
LOG.info( String.format( "Starting Indexing [ %s ] Batch number [ %s ]", description, ++countIndexesFinished ) );
if (updateMode) {
indexing.saveOrUpdateBatch(indexAble);
} else {
// do not do update when indexing re-index by indexing an new index and deleting the
// old one then alias the new index to a constant alias name.
indexing.saveBatch(indexAble);
}
indexAble.clear();
indexAble = null;
LOG.info( String.format( "Finished Indexing [ %s ] Batch number [ %s ]", description, countIndexesFinished ) );
}
Thread.sleep(1000);
doNotStop = (countIndexesFinished < numberOfBatchesToIndex) || !stopWhenEmpty || que.size() > 0;
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
LOG.info("Shutting down Indexer" + this);
try {
Thread.sleep(25000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void addToIndexQue(Collection<T> indexableChunk) {
if (!isPreConfiguredChunkSize) {
throw new IllegalStateException("If you have not pre-defined your chunk size use 'addToIndexQueAndIncrementBatchNumber' to add chunks");
}
que.add(indexableChunk);
}
/**
*
* Note - this way seems to be slower than using predefined chunk size,
* pehaps because we are changing variable values in the middle of a thread
* causing more context switching.
*
* @param indexableChunk
*/
public void addToIndexQueAndIncrementBatchNumber(Collection<T> indexableChunk) {
if (isPreConfiguredChunkSize) {
throw new IllegalStateException( "If you have pre-defined your chunk size use 'addToIndexQue' to add chunks");
}
this.numberOfBatchesToIndex++;
que.add(indexableChunk);
LOG.info(String.format(" ADDED BATCH [ %s ] ", this.numberOfBatchesToIndex) );
}
public boolean isStarted() {
return isStarted;
}
public void setIndexingTools(IIndexing<T> indexing) {
this.indexing = indexing;
}
public boolean isStillRunning() {
return doNotStop;
}
public String stopStatus() {
return
String.format( "doNotStop [ %s ] countIndexesFinished < numberOfBatchesToIndex [ %s ], !stopWhenEmpty [ %s ], que.size [ %s ]",
doNotStop, (countIndexesFinished < numberOfBatchesToIndex), !stopWhenEmpty, que.size() );
}
/**
* Set this to false to keep the Thread going because we expect more data to
* be added later. - once all data has been added you must manually set this
* back to true or the thread will continue for ever.
*
* @param stopWhenEmpty
*/
public void setStopWhenEmpty(boolean stopWhenEmpty) {
this.stopWhenEmpty = stopWhenEmpty;
}
}
======================================================================================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment