Skip to content

Instantly share code, notes, and snippets.

@josh-richardson
Created August 28, 2019 10:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save josh-richardson/e5d35d39802422d47e5e69b66fe80807 to your computer and use it in GitHub Desktop.
Save josh-richardson/e5d35d39802422d47e5e69b66fe80807 to your computer and use it in GitHub Desktop.
CompletableFuture[] sendRequestsToOthers(Stream<Box.PublicKey> addresses, Serializable request, String endpoint) {
return addresses.map(pKey -> {
URL recipientURL = networkNodes.urlForRecipient(pKey);
log.info("Propagating request to {} with URL {} {}", pKey, recipientURL.toString(), endpoint);
final CompletableFuture<Boolean> responseFuture = new CompletableFuture<>();
final byte[] payload = Serializer.serialize(CBOR, request);
httpClient
.post(recipientURL.getPort(), recipientURL.getHost(), endpoint)
.putHeader("Content-Type", "application/cbor")
.handler(response -> response.bodyHandler(responseBody -> {
handleResponse(endpoint, pKey, recipientURL, responseFuture, response);
}))
.exceptionHandler(
ex -> responseFuture.completeExceptionally(new OrionException(OrionErrorCode.NODE_PUSHING_TO_PEER, ex)))
.end(Buffer.buffer(payload));
return responseFuture;
}).toArray(CompletableFuture[]::new);
}
private void handleResponse(String endpoint, Box.PublicKey pKey, URL recipientURL, CompletableFuture<Boolean> responseFuture, HttpClientResponse response) {
log.info("{} with URL {} responded with {}", pKey, recipientURL.toString(), response.statusCode());
if (response.statusCode() != 200) {
responseFuture.completeExceptionally(new OrionException(OrionErrorCode.NODE_PROPAGATING_TO_ALL_PEERS));
} else {
log.info("Success for {}", endpoint);
responseFuture.complete(true);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment