Skip to content

Instantly share code, notes, and snippets.

@jsancio
Created February 11, 2021 02:04
Show Gist options
  • Save jsancio/06bdffc9f25450127b1dc730ebbd55f7 to your computer and use it in GitHub Desktop.
Save jsancio/06bdffc9f25450127b1dc730ebbd55f7 to your computer and use it in GitHub Desktop.
class BatchAccumulator {
// ...
private Long append(int epoch, List<T> records, boolean isAtomic) {
if (epoch != this.epoch) {
return Long.MAX_VALUE;
}
ObjectSerializationCache serializationCache = new ObjectSerializationCache();
int batchSize = 0;
for (T record : records) {
batchSize += serde.recordSize(record, serializationCache);
}
appendLock.lock();
try {
maybeCompleteDrain();
BatchBuilder<T> batch = null;
if (isAtomic) {
batch = maybeAllocateBatch(batchSize);
}
for (T record : records) {
if (!isAtomic) {
batch = maybeAllocateBatch(
serde.recordSize(record, serializationCache)
);
}
if (batch == null) {
return null;
}
batch.appendRecord(record, serializationCache);
nextOffset += 1;
}
maybeResetLinger();
return nextOffset - 1;
} finally {
appendLock.unlock();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment