Skip to content

Instantly share code, notes, and snippets.

@bmaggi
Last active July 8, 2022 02:34
Show Gist options
  • Save bmaggi/8e42a16a02f18d3bff9b0b742a75bfe7 to your computer and use it in GitHub Desktop.
Save bmaggi/8e42a16a02f18d3bff9b0b742a75bfe7 to your computer and use it in GitHub Desktop.
Transform KafakFuture <-> CompletableFuture
// See https://issues.apache.org/jira/browse/KAFKA-6987 for ongoing work on KafkaFuture implementation of CompletableFuture
private <T> CompletableFuture<T> toCompletableFuture(final KafkaFuture<T> kafkaFuture) {
final CompletableFuture<T> wrappingFuture = new CompletableFuture<>();
kafkaFuture.whenComplete((value, throwable) -> {
if (throwable != null) {
wrappingFuture.completeExceptionally(throwable);
} else {
wrappingFuture.complete(value);
}
});
return wrappingFuture;
}
private <T> KafkaFuture<T> toKafkaFuture(final CompletableFuture<T> completableFuture) {
final KafkaFutureImpl<T> wrappingFuture = new KafkaFutureImpl<>();
completableFuture.whenComplete((value, throwable) -> {
if (throwable != null) {
wrappingFuture.completeExceptionally(throwable);
} else {
wrappingFuture.complete(value);
}
});
return wrappingFuture;
}
@armorsuitable
Copy link

call toCompletableFuture : org.apache.kafka.common.errors.TimeoutException: The AdminClient thread is not accepting new calls.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment