Created
February 11, 2021 02:04
-
-
Save jsancio/06bdffc9f25450127b1dc730ebbd55f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BatchAccumulator { | |
// ... | |
private Long append(int epoch, List<T> records, boolean isAtomic) { | |
if (epoch != this.epoch) { | |
return Long.MAX_VALUE; | |
} | |
ObjectSerializationCache serializationCache = new ObjectSerializationCache(); | |
int batchSize = 0; | |
for (T record : records) { | |
batchSize += serde.recordSize(record, serializationCache); | |
} | |
appendLock.lock(); | |
try { | |
maybeCompleteDrain(); | |
BatchBuilder<T> batch = null; | |
if (isAtomic) { | |
batch = maybeAllocateBatch(batchSize); | |
} | |
for (T record : records) { | |
if (!isAtomic) { | |
batch = maybeAllocateBatch( | |
serde.recordSize(record, serializationCache) | |
); | |
} | |
if (batch == null) { | |
return null; | |
} | |
batch.appendRecord(record, serializationCache); | |
nextOffset += 1; | |
} | |
maybeResetLinger(); | |
return nextOffset - 1; | |
} finally { | |
appendLock.unlock(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment