-
-
Save abelyansky/6f29ef567d1ee149857457248c68a230 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void processRecord(PutRecordsRequestEntry entry) { | |
try { | |
int recordSize = (int) this.kinesisClientManager.estimateSize(entry); | |
// enforce size and rate limits by blocking until new tokens are generated | |
entriesRatePerShard.acquire(); | |
bytesRatePerShard.acquire(recordSize); | |
if (bufferQueue.size() + 1 >= MAX_ENTRIES_IN_REQUEST) { | |
if (logger.isDebugEnabled()) logger.debug("buffer queue is at size limit. flushing"); | |
this.flush(); | |
} | |
if (bufferQueue.getTotalByteCount() + recordSize >= MAX_BYTES_IN_REQUEST) { | |
if (logger.isDebugEnabled()) logger.debug("buffer queue is at byte limit. flushing"); | |
this.flush(); | |
} | |
// buffer the item | |
bufferQueue.addWithSize(entry, recordSize); | |
} catch (Exception e) { | |
logger.warn("exception while buffering ", e); | |
futureCallback.onFailure(e); | |
} | |
} | |
public void flush() { | |
FlushTask task = new FlushTask(this.bufferQueue); | |
this.bufferQueue = new QueueWithByteCount<>(); | |
if (asyncFlush) { | |
// submit to the thread pool to be sent out to kinesis - default behavior | |
this.futures.add(flushExec.addWork(task)); | |
} else { | |
task.flush(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment