Skip to content

Instantly share code, notes, and snippets.

@Schachte
Last active March 21, 2020 07:01
Show Gist options
  • Save Schachte/e12aaf007971b06a3bc0ed168c5f983e to your computer and use it in GitHub Desktop.
Save Schachte/e12aaf007971b06a3bc0ed168c5f983e to your computer and use it in GitHub Desktop.
Consumer batching with record count and time
import static java.util.Objects.isNull;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Predicate;
/**
* Replicates custom batching in a fake consumer that specifies lower and upper-bounds on record bytes
* and elapsed time.
*/
public class Playground {
private static final int FETCH_MIN_RECORDS = 4650000;
private static final int MAX_TIMEOUT_MS = 100;
private static Predicate<Integer> minRecordsSatisfied = (records) -> records > FETCH_MIN_RECORDS;
private static Predicate<Instant> maxTimeoutExceeded = (startTime) -> Duration.between(startTime, Instant.now()).toMillis() > MAX_TIMEOUT_MS;
private Instant start = null;
private List<ConsumerRecord> recordBuffer = new ArrayList<>();
public static void main(String[] args) {
Playground consumer = new Playground();
// Replicates Base Consumer
while (true) {
List<ConsumerRecord> records = consumer.poll();
// Batch records in-memory until criteria is satisfied
consumer.recordBuffer.addAll(records);
if (isNull(consumer.start)) {
consumer.start = Instant.now();
}
// Process records when either criteria is hit first
if (minRecordsSatisfied.test(consumer.recordBuffer.size()) ||
maxTimeoutExceeded.test(consumer.start)) {
consumer.processRecords(consumer.recordBuffer);
}
}
}
public void processRecords(List<ConsumerRecord> records) {
System.out.println("Uploading " + records.size() + "records to S3.");
commitOffset();
}
/**
* Replicate a fake consumer poll event
*/
public List<ConsumerRecord> poll() {
int amount = (int) (Math.random() * 10) + 1;
List<ConsumerRecord> data = new ArrayList<>();
for (int i = 0; i < amount; i++) {
data.add(new ConsumerRecord(UUID.randomUUID().toString()));
}
return data;
}
/**
* Replicate a fake offset commit on all batched records
*/
public void commitOffset() {
System.out.println("Offset committed!");
recordBuffer.clear();
start = null;
}
class ConsumerRecord {
String value;
public ConsumerRecord(String value) {this.value = value;}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment