Created
December 2, 2019 20:12
-
-
Save Glamdring/ea5b40336db2649c99644b1174eecb1c to your computer and use it in GitHub Desktop.
ElasticSearch Bulk Processor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A component that takes care of the bulking of elasticsearch indexing requests to accommodate near-real-time indexing | |
* | |
* It executes indexing requests when either a time limit or a size limit is reached. | |
* It avoids blocking the current thread that inserts the records to be indexed. | |
*/ | |
@Component | |
public class ESBulkProcessor { | |
private static final Logger logger = LoggerFactory.getLogger(ESBulkProcessor.class); | |
private static final int FIRST_NON_SUCCESS_HTTP_STATUS = 300; | |
@Autowired | |
private JestClient elasticSearchClient; | |
@Value("${es.bulk.events.limit}") | |
private int limit; | |
@Value("${es.bulk.processor.rate.seconds}") | |
private int rate; | |
@Value("${es.bulk.indexing.threads}") | |
private int indexingThreads; | |
private ReentrantLock lock = new ReentrantLock(); | |
/** | |
* Using the same executor for the time-bound and count-bound triggers | |
*/ | |
private ScheduledExecutorService executorService; | |
private ConcurrentLinkedQueue<Index> indexRequestsQueue = new ConcurrentLinkedQueue<>();; | |
private AtomicInteger count = new AtomicInteger(); | |
@PostConstruct | |
public void init() { | |
executorService = Executors.newScheduledThreadPool(indexingThreads); | |
executorService.scheduleAtFixedRate(this::flush, 0, rate, TimeUnit.SECONDS); | |
} | |
public void flush() { | |
try { | |
if (!indexRequestsQueue.isEmpty()) { | |
// no need to lock here as even if there is concurrent fetching from the queue | |
// both threads will fetch certain number of items and flush them separately | |
flush(createBulkRequest()); | |
} | |
} catch (Exception ex) { | |
logger.error("Failed to flush bulk processor", ex); | |
} | |
} | |
private void flush(Bulk.Builder bulkRequest) { | |
try { | |
BulkResult result = elasticSearchClient.execute(bulkRequest.build()); | |
checkForFailure(result); | |
} catch (Exception e) { | |
logger.error("Failed to index entities {}", bulkRequest, e); | |
} | |
} | |
public static void checkForFailure(BulkResult result) { | |
if (!result.isSucceeded()) { | |
for (BulkResult.BulkResultItem item : result.getItems()) { | |
// ignore errors related to attempts to index entries that are already indexed | |
if (item.status >= FIRST_NON_SUCCESS_HTTP_STATUS | |
&& !item.errorType.equalsIgnoreCase("version_conflict_engine_exception")) { | |
throw new IllegalArgumentException("Elasticsearch failed to index item. Error=" | |
+ item.error + ", " + item.errorReason + ", id=" + item.id); | |
} | |
} | |
} | |
} | |
private Bulk.Builder createBulkRequest() { | |
Bulk.Builder builder = new Bulk.Builder(); | |
Index currentIndexRequest = indexRequestsQueue.poll(); | |
int batchSize = 0; | |
while (batchSize < limit && currentIndexRequest != null) { | |
builder.addAction(currentIndexRequest); | |
currentIndexRequest = indexRequestsQueue.poll(); | |
count.decrementAndGet(); | |
batchSize++; | |
} | |
return builder; | |
} | |
public void addAction(Index index) { | |
indexRequestsQueue.add(index); | |
count.incrementAndGet(); | |
if (count.get() >= limit) { | |
// locking so that only one thread can create a bulk by polling the queue | |
// note that the thread poll has a number of threads as the indexing itself | |
// is a slow operation (and forming the batch from items in memory is fast) | |
// so even though there is locking on fetching the items, it makes sense | |
// to have multi-threaded flushing. | |
if (lock.tryLock()) { | |
try { | |
Bulk.Builder bulkRequest = createBulkRequest(); | |
executorService.submit(() -> flush(bulkRequest)); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment