Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
ElasticSearch Bulk Processor
* A component that takes care of the bulking of elasticsearch indexing requests to accommodate near-real-time indexing
* It executes indexing requests when either a time limit or a size limit is reached.
* It avoids blocking the current thread that inserts the records to be indexed.
public class ESBulkProcessor {
private static final Logger logger = LoggerFactory.getLogger(ESBulkProcessor.class);
private static final int FIRST_NON_SUCCESS_HTTP_STATUS = 300;
private JestClient elasticSearchClient;
private int limit;
private int rate;
private int indexingThreads;
private ReentrantLock lock = new ReentrantLock();
* Using the same executor for the time-bound and count-bound triggers
private ScheduledExecutorService executorService;
private ConcurrentLinkedQueue<Index> indexRequestsQueue = new ConcurrentLinkedQueue<>();;
private AtomicInteger count = new AtomicInteger();
public void init() {
executorService = Executors.newScheduledThreadPool(indexingThreads);
executorService.scheduleAtFixedRate(this::flush, 0, rate, TimeUnit.SECONDS);
public void flush() {
try {
if (!indexRequestsQueue.isEmpty()) {
// no need to lock here as even if there is concurrent fetching from the queue
// both threads will fetch certain number of items and flush them separately
} catch (Exception ex) {
logger.error("Failed to flush bulk processor", ex);
private void flush(Bulk.Builder bulkRequest) {
try {
BulkResult result = elasticSearchClient.execute(;
} catch (Exception e) {
logger.error("Failed to index entities {}", bulkRequest, e);
public static void checkForFailure(BulkResult result) {
if (!result.isSucceeded()) {
for (BulkResult.BulkResultItem item : result.getItems()) {
// ignore errors related to attempts to index entries that are already indexed
&& !item.errorType.equalsIgnoreCase("version_conflict_engine_exception")) {
throw new IllegalArgumentException("Elasticsearch failed to index item. Error="
+ item.error + ", " + item.errorReason + ", id=" +;
private Bulk.Builder createBulkRequest() {
Bulk.Builder builder = new Bulk.Builder();
Index currentIndexRequest = indexRequestsQueue.poll();
int batchSize = 0;
while (batchSize < limit && currentIndexRequest != null) {
currentIndexRequest = indexRequestsQueue.poll();
return builder;
public void addAction(Index index) {
if (count.get() >= limit) {
// locking so that only one thread can create a bulk by polling the queue
// note that the thread poll has a number of threads as the indexing itself
// is a slow operation (and forming the batch from items in memory is fast)
// so even though there is locking on fetching the items, it makes sense
// to have multi-threaded flushing.
if (lock.tryLock()) {
try {
Bulk.Builder bulkRequest = createBulkRequest();
executorService.submit(() -> flush(bulkRequest));
} finally {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.