Skip to content

Instantly share code, notes, and snippets.

@zlatozar
Last active March 18, 2023 08:22
Show Gist options
  • Save zlatozar/6c1e1d5b8b6cb9fb7bd2da89a10ad26c to your computer and use it in GitHub Desktop.
Save zlatozar/6c1e1d5b8b6cb9fb7bd2da89a10ad26c to your computer and use it in GitHub Desktop.
Organise Kafka retries to third party systems
public class RetryProcessor {
private final long schedulerIntervalMs = 60000;
private final String entityStoreName = "failed-message-store";
private KeyValueStore<String, Object> entityStore;
@Override
public void init(final ProcessorContext context) {
this.entityStore = (KeyValueStore) context().getStateStore(entityStoreName);
context().schedule(Duration.ofMillis(this.schedulerIntervalMs), PunctuationType.WALL_CLOCK_TIME,
timestamp -> processFailedMessagesStore());
}
@Override
public void process(final String key, final Object value) {
boolean apiCallSuccessful = // call API
if (!apiCallSuccesfull) {
entityStore.put(key, value);
}
}
// Helper methods
private void processFailedMessagesStore() {
try (KeyValueIterator<String, Object> allItems = entityStore.all()) {
allItems.forEachRemaining(item -> {
boolean successfullyProcessed = // re-process
if (successfullyProcessed) {
entityStore.delete(item.key);
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment