Skip to content

Instantly share code, notes, and snippets.

@abelyansky
Last active August 20, 2019 19:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abelyansky/6f29ef567d1ee149857457248c68a230 to your computer and use it in GitHub Desktop.
Save abelyansky/6f29ef567d1ee149857457248c68a230 to your computer and use it in GitHub Desktop.
public void processRecord(PutRecordsRequestEntry entry) {
try {
int recordSize = (int) this.kinesisClientManager.estimateSize(entry);
// enforce size and rate limits by blocking until new tokens are generated
entriesRatePerShard.acquire();
bytesRatePerShard.acquire(recordSize);
if (bufferQueue.size() + 1 >= MAX_ENTRIES_IN_REQUEST) {
if (logger.isDebugEnabled()) logger.debug("buffer queue is at size limit. flushing");
this.flush();
}
if (bufferQueue.getTotalByteCount() + recordSize >= MAX_BYTES_IN_REQUEST) {
if (logger.isDebugEnabled()) logger.debug("buffer queue is at byte limit. flushing");
this.flush();
}
// buffer the item
bufferQueue.addWithSize(entry, recordSize);
} catch (Exception e) {
logger.warn("exception while buffering ", e);
futureCallback.onFailure(e);
}
}
public void flush() {
FlushTask task = new FlushTask(this.bufferQueue);
this.bufferQueue = new QueueWithByteCount<>();
if (asyncFlush) {
// submit to the thread pool to be sent out to kinesis - default behavior
this.futures.add(flushExec.addWork(task));
} else {
task.flush();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment