Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ElasticSearch Bulk Processor
/**
* A component that takes care of the bulking of elasticsearch indexing requests to accommodate near-real-time indexing
*
* It executes indexing requests when either a time limit or a size limit is reached.
* It avoids blocking the current thread that inserts the records to be indexed.
*/
@Component
public class ESBulkProcessor {
private static final Logger logger = LoggerFactory.getLogger(ESBulkProcessor.class);
private static final int FIRST_NON_SUCCESS_HTTP_STATUS = 300;
@Autowired
private JestClient elasticSearchClient;
@Value("${es.bulk.events.limit}")
private int limit;
@Value("${es.bulk.processor.rate.seconds}")
private int rate;
@Value("${es.bulk.indexing.threads}")
private int indexingThreads;
private ReentrantLock lock = new ReentrantLock();
/**
* Using the same executor for the time-bound and count-bound triggers
*/
private ScheduledExecutorService executorService;
private ConcurrentLinkedQueue<Index> indexRequestsQueue = new ConcurrentLinkedQueue<>();;
private AtomicInteger count = new AtomicInteger();
@PostConstruct
public void init() {
executorService = Executors.newScheduledThreadPool(indexingThreads);
executorService.scheduleAtFixedRate(this::flush, 0, rate, TimeUnit.SECONDS);
}
public void flush() {
try {
if (!indexRequestsQueue.isEmpty()) {
// no need to lock here as even if there is concurrent fetching from the queue
// both threads will fetch certain number of items and flush them separately
flush(createBulkRequest());
}
} catch (Exception ex) {
logger.error("Failed to flush bulk processor", ex);
}
}
private void flush(Bulk.Builder bulkRequest) {
try {
BulkResult result = elasticSearchClient.execute(bulkRequest.build());
checkForFailure(result);
} catch (Exception e) {
logger.error("Failed to index entities {}", bulkRequest, e);
}
}
public static void checkForFailure(BulkResult result) {
if (!result.isSucceeded()) {
for (BulkResult.BulkResultItem item : result.getItems()) {
// ignore errors related to attempts to index entries that are already indexed
if (item.status >= FIRST_NON_SUCCESS_HTTP_STATUS
&& !item.errorType.equalsIgnoreCase("version_conflict_engine_exception")) {
throw new IllegalArgumentException("Elasticsearch failed to index item. Error="
+ item.error + ", " + item.errorReason + ", id=" + item.id);
}
}
}
}
private Bulk.Builder createBulkRequest() {
Bulk.Builder builder = new Bulk.Builder();
Index currentIndexRequest = indexRequestsQueue.poll();
int batchSize = 0;
while (batchSize < limit && currentIndexRequest != null) {
builder.addAction(currentIndexRequest);
currentIndexRequest = indexRequestsQueue.poll();
count.decrementAndGet();
batchSize++;
}
return builder;
}
public void addAction(Index index) {
indexRequestsQueue.add(index);
count.incrementAndGet();
if (count.get() >= limit) {
// locking so that only one thread can create a bulk by polling the queue
// note that the thread poll has a number of threads as the indexing itself
// is a slow operation (and forming the batch from items in memory is fast)
// so even though there is locking on fetching the items, it makes sense
// to have multi-threaded flushing.
if (lock.tryLock()) {
try {
Bulk.Builder bulkRequest = createBulkRequest();
executorService.submit(() -> flush(bulkRequest));
} finally {
lock.unlock();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.