Skip to content

Instantly share code, notes, and snippets.

@lucas-dclrcq
Last active October 4, 2021 15:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lucas-dclrcq/41712931a4b3a37a66faf5fed6c8fd9c to your computer and use it in GitHub Desktop.
Save lucas-dclrcq/41712931a4b3a37a66faf5fed6c8fd9c to your computer and use it in GitHub Desktop.
Heyp Kafka Retry Mechanism
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
class Scratch {
private final List<String> consumedTopic;
private final String topic;
private LocalDateTime shouldWaitUntil;
public Scratch(String topic) {
this.topic = topic;
this.consumedTopic = List.of(this.topic);
}
public void execute() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(Map.of());
consumer.subscribe(consumedTopic);
while (true) {
if (shouldWaitUntil != null && LocalDateTime.now().isBefore(shouldWaitUntil)) {
consumer.resume(consumedTopic.stream().map(s -> new TopicPartition(s, 0)).collect(Collectors.toList()));
shouldWaitUntil = null;
}
final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5));
final Iterable<ConsumerRecord<String, String>> records = poll.records(topic);
for (var record : records) {
final long minTimestamp = LocalDateTime.now().minusMinutes(15).toEpochSecond(ZoneOffset.UTC);
if (record.timestamp() < minTimestamp) {
// do something with record here
consumer.commitSync();
} else {
consumer.pause(consumedTopic.stream().map(s -> new TopicPartition(s, 0)).collect(Collectors.toList()));
shouldWaitUntil = LocalDateTime.ofEpochSecond(record.timestamp(), 0, ZoneOffset.UTC).plusMinutes(15);
// Peut être pas nécessaire
consumer.seek(new TopicPartition(topic, 0), record.offset());
return;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment