Created
October 19, 2011 09:52
-
-
Save dav-rob/1297867 to your computer and use it in GitHub Desktop.
ElasticSearch Bulk Indexing from JDBC Input
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ****.elasticsearch.index; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.elasticsearch.action.ActionListener; | |
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.action.delete.DeleteResponse; | |
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; | |
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; | |
import org.elasticsearch.action.get.GetRequest; | |
import org.elasticsearch.action.get.GetResponse; | |
import org.elasticsearch.action.index.IndexRequest; | |
import org.elasticsearch.action.index.IndexResponse; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.client.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.client.action.delete.DeleteRequestBuilder; | |
import org.elasticsearch.client.action.get.GetRequestBuilder; | |
import org.elasticsearch.client.action.search.SearchRequestBuilder; | |
import org.elasticsearch.index.query.MatchAllQueryBuilder; | |
import org.elasticsearch.index.query.QueryStringQueryBuilder; | |
import org.elasticsearch.search.SearchHit; | |
import org.elasticsearch.search.SearchHits; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import ****.index.bean.JsonBean; | |
import ****.index.compass.IIndexing; | |
import ****.index.jdbc.BuildIndex; | |
import ****.util.CSUtils; | |
public abstract class BaseESIndexer implements IIndexing<JsonBean>{ | |
private final static Log LOG = LogFactory.getLog(BaseESIndexer.class); | |
@Autowired | |
protected ESNodeWrapper esNode; | |
protected String indexName = null; | |
protected int bulkSize = 100; | |
protected int dropThreshold = 10; | |
protected final AtomicInteger onGoingBulks = new AtomicInteger(); | |
protected int bulkOperations = 0; | |
/** | |
* This method should be used to initialize the Indexer once the ESNodeWrapper has been added. | |
* @throws IOException | |
*/ | |
protected void init() { | |
// new indexes should only be created by (nightly, weekly) indexing processes | |
/* try { | |
OpenIndexResponse resp = esNode.getClient() | |
.admin() | |
.indices().prepareOpen(indexName).execute().actionGet(); | |
} catch (ElasticSearchException e) { | |
LOG.info("Error opening Index: " + indexName + ", will now create that index."); | |
e.printStackTrace(); | |
// create index if one is not already present. | |
ESAdmin.createIndex(esNode, indexName); | |
} */ | |
} | |
public void clear() { | |
DeleteByQueryResponse deleteResp = esNode.getClient().prepareDeleteByQuery(indexName) | |
.setQuery(new MatchAllQueryBuilder()) | |
.execute() | |
.actionGet(); | |
} | |
public Map load(Integer contactId) { | |
String queryStr = "id:" + contactId; | |
QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder(queryStr); | |
SearchRequestBuilder searchReq = esNode.getClient().prepareSearch().setIndices(this.getIndexName()) | |
.setQuery(queryBuilder); | |
SearchResponse searchResponse = searchReq.execute().actionGet(); | |
SearchHit[] hits = searchResponse.getHits().getHits(); | |
if (hits == null || hits.length == 0){ | |
return null; | |
} | |
Map<String, Object> result = hits[0].getSource(); | |
return result; | |
} | |
public void remove(int id) { | |
LOG.info("Starting to remove id[" + id + "]"); | |
String queryStr = "id:" + id; | |
QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder(queryStr); | |
DeleteByQueryResponse deleteResp = esNode.getClient().prepareDeleteByQuery(indexName) | |
.setQuery(queryBuilder) | |
.execute().actionGet(); | |
Map<String, IndexDeleteByQueryResponse> indexMap = deleteResp.indices(); | |
if (indexMap == null){ | |
return; | |
} | |
for (String index : indexMap.keySet()){ | |
IndexDeleteByQueryResponse resp = indexMap.get(index); | |
if (resp.failedShards() > 0){ | |
LOG.warn("Error deleting id[" + id +"]"); | |
} | |
} | |
LOG.info("Finished removing id[" + id + "]"); | |
} | |
public void refresh() { | |
RefreshRequest request = new RefreshRequest(indexName); | |
esNode.getClient().admin().indices().refresh(request ).actionGet(); | |
} | |
/** | |
* This should only be used for tens or at most hundreds of deletes. | |
* For more than this use removeBatchBulk. | |
*/ | |
public void removeBatch(Collection<Integer> ids) { | |
for (int id : ids){ | |
Map indexObject = load(id); | |
if (indexObject != null){ | |
remove(id); | |
} | |
} | |
} | |
public void removeBatchBulk(Collection<Integer> ids) { | |
LOG.info("Starting removeBatch of " + ids.size() + " objects on " + getIndexName()); | |
BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk(); | |
for (int id : ids){ | |
Map indexObject = load(id); | |
if (indexObject != null){ | |
String type = null;// ????? where to get this in a delete operation !!! | |
Boolean createNew = bulkDelete(id, bulkOperation, type); | |
if (createNew){ | |
bulkOperation = esNode.getClient().prepareBulk(); | |
} | |
} | |
} | |
finishBulk(bulkOperation, "removeBatch"); | |
LOG.info("Finished removeBatch of " + ids.size() + " objects on " + getIndexName()); | |
} | |
public void saveBatch(Collection<JsonBean> beans) { | |
LOG.info("Starting saveBatch Indexing " + beans.size() + " " + getIndexName() + ", documents."); | |
BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk(); | |
for (JsonBean jsonBean: beans){ | |
Boolean createNew = bulkSave(jsonBean, bulkOperation); | |
if (createNew){ | |
bulkOperation = esNode.getClient().prepareBulk(); | |
} | |
} | |
// index the stragglers remaining | |
finishBulk(bulkOperation, "saveBatch"); | |
LOG.info("Finished saveBatch indexing threads " + beans.size() + " " + getIndexName() + ", documents."); | |
} | |
private void finishBulk(BulkRequestBuilder bulkOperation, String desc) { | |
int oldBulkSize = bulkSize; | |
bulkSize = 1; | |
LOG.info(desc + " Index remaining " + bulkOperation.numberOfActions() + " " + getIndexName() + ", operations."); | |
processBulkIfNeeded(bulkOperation, bulkOperation.numberOfActions() + bulkOperations); | |
bulkSize = oldBulkSize; | |
} | |
public void save(JsonBean party) { | |
IndexResponse postResponse = esNode.getClient().prepareIndex() | |
.setIndex(indexName) | |
.setType(party.getIndexType()) | |
.setId(""+party.getId()) | |
.setSource(party.getJson()) | |
.setRefresh(true) | |
.execute().actionGet(); | |
} | |
public void saveOrUpdateBatch(Collection<JsonBean> parties) { | |
LOG.info("Starting saveOrUpdateBatch Indexing of" + parties.size() + " " + getIndexName() + ", documents."); | |
BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk(); | |
for (JsonBean jsonBean: parties){ | |
GetResponse getResponse = esNode.getClient() | |
.prepareGet(jsonBean.getIndexName(), jsonBean.getIndexType(), "" + jsonBean.getId()) | |
.execute().actionGet(); | |
if (getResponse != null){ | |
Boolean createNew = bulkDelete(jsonBean, bulkOperation); | |
if (createNew){ | |
bulkOperation = esNode.getClient().prepareBulk(); | |
} | |
} | |
Boolean createNew = bulkSave(jsonBean, bulkOperation); | |
if (createNew){ | |
bulkOperation = esNode.getClient().prepareBulk(); | |
} | |
} | |
// index the stragglers remaining | |
int oldBulkSize = bulkSize; | |
bulkSize = 1; | |
LOG.info("saveOrUpdateBatch Index remaining " + bulkOperation.numberOfActions() + " " + getIndexName() + ", operations."); | |
processBulkIfNeeded(bulkOperation, bulkOperation.numberOfActions() + bulkOperations); | |
bulkSize = oldBulkSize; | |
LOG.info("Finished starting indexing threads " + parties.size() + " " + getIndexName() + ", documents."); | |
} | |
public void saveOrUpdate(JsonBean party) { | |
/*CompassSession cmpSess = null; | |
CompassTransaction tx = null; | |
try | |
{ | |
cmpSess = compass.openSession(); | |
tx = cmpSess.beginTransaction(); | |
try{ | |
cmpSess.load("accountindex", party.getParty_id()); | |
cmpSess.delete("accountindex", party.getParty_id()); | |
}catch(CompassException ignore){} | |
cmpSess.save((AccountIndex)party); | |
tx.commit(); | |
} catch (CompassException ce) { | |
handleException(tx, ce); | |
}finally{ | |
cmpSess.close(); | |
}*/ | |
} | |
/*@Override | |
public CompassHits search(String query){ | |
CompassSession cmpSess = null; | |
CompassHits hits = null; | |
if (query!=null){ | |
try | |
{ | |
cmpSess = compass.openSession(); | |
hits = cmpSess.find(query); | |
} catch (CompassException ce) { | |
LOG.error("searchExtParty raises exception: ", ce); | |
}finally{ | |
cmpSess.close(); | |
} | |
} | |
return hits; | |
}*/ | |
public List<JsonBean> searchList(String query){ | |
List<JsonBean> results = new ArrayList<JsonBean>(); | |
/*CompassSession cmpSess = null; | |
if (query!=null){ | |
try | |
{ | |
cmpSess = compass.openSession(); | |
CompassHits hits = cmpSess.find(query); | |
for (int i = 0; i < hits.getLength(); i++) { | |
results.add((AccountIndex) hits.data(i)); | |
} | |
} catch (CompassException ce) { | |
LOG.error("searchIoi raises exception: ", ce); | |
}finally{ | |
cmpSess.close(); | |
} | |
}*/ | |
return results; | |
} | |
public List<JsonBean> loadAllList() { | |
/*ArrayList<String> results = new ArrayList<String>(); | |
SearchResponse response = esNode.getClient().prepareSearch(indexName).setSearchType(indexType).execute().actionGet(); | |
SearchHits hitsObj = response.getHits(); | |
return hitsObj;*/ | |
throw new UnsupportedOperationException("We don't want to return millions of results in an array list should query through SearchHits API"); | |
} | |
/*@Override | |
public CompassHits loadAll() { | |
CompassHits results = search("alias:accountindex"); | |
return results; | |
}*/ | |
public void CUDParties(List<Map<String, Object>> contactsLst) { | |
// TODO Auto-generated method stub | |
} | |
public Boolean bulkDelete(Integer id, BulkRequestBuilder bulkOperation, String type) { | |
DeleteRequestBuilder del = esNode.getClient().prepareDelete().setIndex(getIndexName()).setId(id.toString()).setType(type); | |
bulkOperation.add(del); | |
Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++); | |
return createNewBulkOperation; | |
} | |
public Boolean bulkDelete(JsonBean jsonBean, BulkRequestBuilder bulkOperation) { | |
DeleteRequestBuilder del = esNode.getClient().prepareDelete(indexName, jsonBean.getIndexType(), "" + jsonBean.getId()); | |
bulkOperation.add(del); | |
Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++); | |
return createNewBulkOperation; | |
} | |
public Boolean bulkSave(JsonBean jsonBean, BulkRequestBuilder bulkOperation) { | |
IndexRequest indexOperation = Requests.indexRequest(indexName) | |
.type(jsonBean.getIndexType()) | |
.id("" + jsonBean.getId()) | |
.create(false) | |
.source(jsonBean.getJson()); | |
bulkOperation.add(indexOperation); | |
Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++); | |
return createNewBulkOperation; | |
} | |
private Boolean processBulkIfNeeded(BulkRequestBuilder bulkOperation, int bulkOperations) { | |
Boolean createNewBulkReq = false; | |
if (bulkOperation.numberOfActions() >= bulkSize) { | |
// execute the bulk operation | |
int currentOnGoingBulks = onGoingBulks.incrementAndGet(); | |
LOG.info("Ongoing Bulks = " + currentOnGoingBulks + ", Index Operations = " + bulkOperations ); | |
if (currentOnGoingBulks > dropThreshold) { | |
// TODO, just wait here!, we can slow down the parsing | |
onGoingBulks.decrementAndGet(); | |
String message = "dropping bulk, " + onGoingBulks + " crossed threshold " + dropThreshold + ", Index Operations = " + bulkOperations; | |
LOG.error(message); | |
registerError(new Exception(message)); | |
} else { | |
try { | |
final int bulkNo = onGoingBulks.get(); | |
LOG.info("Executing Bulk Request " + bulkNo + " for index " + getIndexName()); | |
bulkOperation.execute(new ActionListener<BulkResponse>() { | |
@Override public void onResponse(BulkResponse bulkResponse) { | |
onGoingBulks.decrementAndGet(); | |
LOG.info("Bulk [" + bulkNo + "] of " + getIndexName() + " Index Executed"); | |
if (bulkResponse.hasFailures()){ | |
RuntimeException indexingException = new RuntimeException(bulkResponse.buildFailureMessage()); | |
registerError(indexingException); | |
} | |
} | |
@Override public void onFailure(Throwable indexingException) { | |
Exception exception = new Exception(indexingException); | |
registerError(exception); | |
} | |
}); | |
} catch (Exception indexingException) { | |
registerError(indexingException); | |
} | |
} | |
// once we have executed a bulk request, create a new one for adding | |
// a fresh set of bulk updates, deletes, additions etc. | |
createNewBulkReq = true; | |
} | |
return createNewBulkReq; | |
} | |
private void registerError(Exception indexingException) { | |
BuildIndex.statusFlag = BuildIndex.ERROR; | |
BuildIndex.exception = indexingException; | |
LOG.error("Indexing Exception", indexingException); | |
} | |
public String getIndexName() { | |
return indexName; | |
} | |
public void setIndexName(String indexName) { | |
this.indexName = indexName; | |
} | |
public int getBulkSize() { | |
return bulkSize; | |
} | |
public void setBulkSize(int bulkSize) { | |
this.bulkSize = bulkSize; | |
} | |
public int getDropThreshold() { | |
return dropThreshold; | |
} | |
public void setDropThreshold(int dropThreshold) { | |
this.dropThreshold = dropThreshold; | |
} | |
public AtomicInteger getOnGoingBulks() { | |
return onGoingBulks; | |
} | |
public ESNodeWrapper getEsNode() { | |
return esNode; | |
} | |
} | |
==================================================================================================================== | |
package ****.index.bean; | |
public class JsonBean { | |
private String json; | |
private int id; | |
private String indexName; | |
private String indexType; | |
public JsonBean(){} | |
public JsonBean(String json, int id, String indexName, String indexType) { | |
this.json = json; | |
this.id = id; | |
this.indexName = indexName; | |
this.indexType = indexType; | |
} | |
public String getJson() { | |
return json; | |
} | |
public void setJson(String json) { | |
this.json = json; | |
} | |
public int getId() { | |
return id; | |
} | |
public void setId(int id) { | |
this.id = id; | |
} | |
public String getIndexName() { | |
return indexName; | |
} | |
public void setIndexName(String indexName) { | |
this.indexName = indexName; | |
} | |
public String getIndexType() { | |
return indexType; | |
} | |
public void setIndexType(String indexType) { | |
this.indexType = indexType; | |
} | |
} | |
=============================================================================================================================== | |
package ****.elasticsearch.buildIndex; | |
import static ****.elasticsearch.util.ESConstants.ALIAS_CONTACT; | |
import static ****.elasticsearch.util.ESConstants.INDEX_CONTACT; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.List; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.codehaus.jackson.JsonGenerationException; | |
import org.codehaus.jackson.map.JsonMappingException; | |
import org.codehaus.jackson.map.ObjectMapper; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import ****.elasticsearch.index.ESContactIndexer; | |
import ****.elasticsearch.index.IndexCleanup; | |
import ****.elasticsearch.util.ESAdmin; | |
import ****.index.bean.Contact; | |
import ****.index.bean.ContactJSONWrapper; | |
import ****.index.bean.JsonBean; | |
import ****.index.compass.QueuedIndexingThread; | |
import ****.index.jdbc.BaseContactJDBCIndexer; | |
import ****.index.jdbc.BuildIndex; | |
import ****.index.jdbc.IntegerRowMapper; | |
import ****.index.jdbc.JDBCIndexer; | |
import ****.index.jdbc.JDBCQueries; | |
import ****.index.jdbc.JDBCQueryHelper; | |
import ****.util.CSUtils; | |
public class ESContactJDBCIndexer extends BaseContactJDBCIndexer implements JDBCIndexer, ElasticSearchIndexer{ //JdbcDaoSupport{ | |
final static Log LOG = LogFactory.getLog(ESContactJDBCIndexer.class); | |
QueuedIndexingThread<JsonBean> indexThread = new QueuedIndexingThread<JsonBean>("ESContact Indexer"); | |
@Autowired(required=true) ESContactIndexer esContactIndexing; | |
public void init() { | |
} | |
public void doIndex() throws Exception { | |
IndexCleanup.cleanup(esContactIndexing.getEsNode(), "Contact JDBC"); | |
Thread.sleep(1000); | |
String oldIndexName = getOldIndexName(); | |
LOG.info("Old Index Name[" + oldIndexName + "]"); | |
String newIndexName = createNewIndex(); | |
LOG.info("New Index Name[" + newIndexName + "]"); | |
esContactIndexing.setIndexName(newIndexName); | |
// This is the big data one | |
getDBDataAndCreateNewIndex(); | |
LOG.info("Removing alias [" + ALIAS_CONTACT + "] for [" + oldIndexName + "]"); | |
removeAliasOldIndex(oldIndexName, ALIAS_CONTACT); | |
LOG.info("Adding alias [" + ALIAS_CONTACT + "] for [" + newIndexName + "]"); | |
aliasNewIndex(newIndexName, ALIAS_CONTACT); | |
//?????? runWarmUpQueries ????? // Lucene 3.0.3 (I think) needs one-off warm up queries. | |
Thread.sleep(1000); | |
LOG.info("Deleting index [" + oldIndexName + "]"); | |
deleteOldIndex(oldIndexName); | |
} | |
@Override | |
public void removeAliasOldIndex(String oldIndexName, String aliasContact) { | |
ESAdmin.removeAlias(esContactIndexing.getEsNode(), oldIndexName, aliasContact); | |
} | |
@Override | |
public void aliasNewIndex(String newIndexName, String aliasContact) { | |
ESAdmin.addAlias(esContactIndexing.getEsNode(), newIndexName, aliasContact); | |
} | |
@Override | |
public void deleteOldIndex(String oldIndexName) { | |
ESAdmin.deleteIndex(esContactIndexing.getEsNode(), oldIndexName); | |
} | |
@Override | |
public String createNewIndex() { | |
return EsJdbcHelper.createNewIndexOfType(INDEX_CONTACT, esContactIndexing.getEsNode()); | |
} | |
@Override | |
public String getOldIndexName() { | |
return EsJdbcHelper.getOldIndexNameOfType(INDEX_CONTACT, esContactIndexing.getEsNode()); | |
} | |
@Override | |
public void getDBDataAndCreateNewIndex() throws Exception, InterruptedException { | |
List<Integer> allPersonPartyIds = getPersonIds(); | |
int size = allPersonPartyIds.size(); | |
if(size==0) return; | |
indexThread.setIndexingTools(esContactIndexing); | |
indexThread.setStopWhenEmpty(false); | |
indexThread.setUpdateMode(false); | |
LOG.info("*** Number of contacts to Index = " + size); | |
// set chunk size to give about 5000 documents per chunk | |
int numberOfChunks = size / 5000; | |
LOG.info("numberOfChunks = " + numberOfChunks); | |
List<JDBCQueryHelper.HiLowId> idBetweenList = JDBCQueryHelper.createBetweenIdList(allPersonPartyIds, numberOfChunks); | |
int count = 0; | |
for (JDBCQueryHelper.HiLowId hiLo: idBetweenList){ | |
if (BuildIndex.statusFlag == BuildIndex.ERROR){ | |
LOG.error("Error In Indexbuilding, stopping now.", BuildIndex.exception); | |
return; | |
} | |
Collection<Contact> parties = mapContacts(hiLo); | |
indexCollection(parties); | |
ESAdmin.getStats(esContactIndexing.getEsNode(), LOG); | |
LOG.info("Pass number " + ++count + " executed."); | |
} | |
Thread.sleep(2000); | |
indexThread.setStopWhenEmpty(true); | |
try { | |
Thread.sleep(5000); | |
while (indexThread.isStillRunning()){ | |
Thread.sleep(5000); | |
} | |
} catch (InterruptedException e) {} | |
} | |
public List<Integer> getPersonIds() { | |
List <Integer> ids = getJdbcTemplate().query( | |
JDBCQueries.GET_ALL_PERSON_IDS_PROC, new IntegerRowMapper()); | |
return ids; | |
} | |
public void setExtPartyIndexing(ESContactIndexer contactIndexing) { | |
this.esContactIndexing = contactIndexing; | |
} | |
public void doIndexLimited(String contactIds) throws Exception { | |
esContactIndexing.setIndexName(ALIAS_CONTACT); | |
LOG.info("Contact Ids[" + contactIds + "]"); | |
if(contactIds==null || "".equals(contactIds)) { | |
LOG.warn("Empty contactIds!! Returning now."); | |
return; | |
} | |
Collection<Contact> parties = mapContactsLimited(contactIds); | |
List<JsonBean> jsonList = getCollectionAsJsonBeanList(parties); | |
esContactIndexing.saveOrUpdateBatch(jsonList); | |
// refresh the index before returning, we do it TWICE deliberately | |
esContactIndexing.refresh(); | |
esContactIndexing.refresh(); | |
} | |
void indexCollection(Collection<Contact> parties) throws Exception { | |
List<JsonBean> jsonList = getCollectionAsJsonBeanList(parties); | |
LOG.info("created " + jsonList.size() + " contact json beans."); | |
indexJSONCollection(jsonList); | |
} | |
private List<JsonBean> getCollectionAsJsonBeanList( | |
Collection<Contact> parties) throws IOException, | |
JsonGenerationException, JsonMappingException { | |
List<JsonBean> jsonList = new ArrayList<JsonBean>(parties.size()); | |
int i = 0; | |
for (Contact contact : parties){ | |
ContactJSONWrapper wrapperForLessVerboseJson = new ContactJSONWrapper(contact); | |
String json1 = CSUtils.mapper().writeValueAsString(wrapperForLessVerboseJson); | |
if (json1 != null){ | |
if (i == 0){ | |
LOG.info("contact json = " + json1); | |
i++; | |
} | |
JsonBean jsonBean = new JsonBean(json1, contact.getPartyId(), esContactIndexing.getIndexName() , contact.getType()); | |
jsonList.add(jsonBean); | |
} | |
} | |
return jsonList; | |
} | |
private void indexJSONCollection(Collection<JsonBean> parties) { | |
indexThread.addToIndexQueAndIncrementBatchNumber(parties); | |
if (!indexThread.isStarted()){ | |
indexThread.start(); | |
} | |
} | |
} | |
============================================================================================================================== | |
package ****.index.compass; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
public class QueuedIndexingThread<T> extends Thread { | |
private String description; | |
/** | |
* Use this if you know how many batches you need to index at instatiation | |
* time. - use "addToIndexQueAndIncrementBatchNumber" when adding 'chunks' | |
* to index. - do not use "addToIndexQue" | |
* | |
* - also this way seems to be slower than using predefined chunk size, | |
* pehaps because we are changing variable values in the middle of a thread | |
* causing more context switching. | |
* | |
* @param numberOfBatchesToIndex | |
*/ | |
public QueuedIndexingThread(String description) { | |
this.numberOfBatchesToIndex = 0; | |
this.isPreConfiguredChunkSize = false; | |
this.description = description; | |
} | |
/** | |
* Use this if you know how many batches you need to index at instatiation | |
* time. - use "addToIndexQue" when adding 'chunks' to index. - do not use | |
* not "addToIndexQueAndIncrementBatchNumber" | |
* | |
* @param numberOfBatchesToIndex | |
*/ | |
public QueuedIndexingThread(int numberOfBatchesToIndex, String description) { | |
this.numberOfBatchesToIndex = numberOfBatchesToIndex; | |
this.isPreConfiguredChunkSize = true; | |
this.description = description; | |
} | |
private final static Log LOG = LogFactory.getLog(QueuedIndexingThread.class); | |
private IIndexing<T> indexing; | |
int numberOfBatchesToIndex = 0; | |
private boolean isStarted = false; | |
private boolean doNotStop = true; | |
private boolean stopWhenEmpty = true; | |
private Boolean isPreConfiguredChunkSize; | |
private Boolean updateMode; | |
private int countIndexesFinished; | |
public Boolean getUpdateMode() { | |
return updateMode; | |
} | |
public void setUpdateMode(Boolean updateMode) { | |
this.updateMode = updateMode; | |
} | |
private LinkedList<Collection<T>> que = new LinkedList<Collection<T>>(); | |
@Override | |
public void run() { | |
isStarted = true; | |
LOG.info("Starting Indexing" + description); | |
while (doNotStop) { | |
try { | |
Collection<T> indexAble = que.poll(); | |
if ( indexAble != null) { | |
LOG.info( String.format( "Starting Indexing [ %s ] Batch number [ %s ]", description, ++countIndexesFinished ) ); | |
if (updateMode) { | |
indexing.saveOrUpdateBatch(indexAble); | |
} else { | |
// do not do update when indexing re-index by indexing an new index and deleting the | |
// old one then alias the new index to a constant alias name. | |
indexing.saveBatch(indexAble); | |
} | |
indexAble.clear(); | |
indexAble = null; | |
LOG.info( String.format( "Finished Indexing [ %s ] Batch number [ %s ]", description, countIndexesFinished ) ); | |
} | |
Thread.sleep(1000); | |
doNotStop = (countIndexesFinished < numberOfBatchesToIndex) || !stopWhenEmpty || que.size() > 0; | |
} catch (InterruptedException ex) { | |
throw new RuntimeException(ex); | |
} | |
} | |
LOG.info("Shutting down Indexer" + this); | |
try { | |
Thread.sleep(25000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
public void addToIndexQue(Collection<T> indexableChunk) { | |
if (!isPreConfiguredChunkSize) { | |
throw new IllegalStateException("If you have not pre-defined your chunk size use 'addToIndexQueAndIncrementBatchNumber' to add chunks"); | |
} | |
que.add(indexableChunk); | |
} | |
/** | |
* | |
* Note - this way seems to be slower than using predefined chunk size, | |
* pehaps because we are changing variable values in the middle of a thread | |
* causing more context switching. | |
* | |
* @param indexableChunk | |
*/ | |
public void addToIndexQueAndIncrementBatchNumber(Collection<T> indexableChunk) { | |
if (isPreConfiguredChunkSize) { | |
throw new IllegalStateException( "If you have pre-defined your chunk size use 'addToIndexQue' to add chunks"); | |
} | |
this.numberOfBatchesToIndex++; | |
que.add(indexableChunk); | |
LOG.info(String.format(" ADDED BATCH [ %s ] ", this.numberOfBatchesToIndex) ); | |
} | |
public boolean isStarted() { | |
return isStarted; | |
} | |
public void setIndexingTools(IIndexing<T> indexing) { | |
this.indexing = indexing; | |
} | |
public boolean isStillRunning() { | |
return doNotStop; | |
} | |
public String stopStatus() { | |
return | |
String.format( "doNotStop [ %s ] countIndexesFinished < numberOfBatchesToIndex [ %s ], !stopWhenEmpty [ %s ], que.size [ %s ]", | |
doNotStop, (countIndexesFinished < numberOfBatchesToIndex), !stopWhenEmpty, que.size() ); | |
} | |
/** | |
* Set this to false to keep the Thread going because we expect more data to | |
* be added later. - once all data has been added you must manually set this | |
* back to true or the thread will continue for ever. | |
* | |
* @param stopWhenEmpty | |
*/ | |
public void setStopWhenEmpty(boolean stopWhenEmpty) { | |
this.stopWhenEmpty = stopWhenEmpty; | |
} | |
} | |
====================================================================================================== | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment