Skip to content

Instantly share code, notes, and snippets.

@pgilad
Last active December 26, 2018 07:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pgilad/de5a530a10f8305af21426900e973aa5 to your computer and use it in GitHub Desktop.
Save pgilad/de5a530a10f8305af21426900e973aa5 to your computer and use it in GitHub Desktop.
DataSender using Netty Reactive Client
package data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
import reactor.retry.Retry;
import settings.Settings;
import java.time.Duration;
@Slf4j
class DataSender {
private static final String SESSION_HEADER_KEY = "X-BZM-SESSION";
private static final String SESSION_TOKEN_HEADER_KEY = "X-BZM-SESSION-TOKEN";
private static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_BACKOFF = Duration.ofSeconds(20);
private static final String CLIENT_APP_NAME = "jetpack";
private static final int MAX_RETRIES = 8;
private static final String REQUEST_FAILED = "Request failed";
private final Settings settings;
DataSender(Settings settings) {
this.settings = settings;
}
Mono<String> submitDataWithRetries(String body, Target target) {
final String uri = this.getSubmitUri(target);
final Retry<Object> retryStrategy = Retry
.any()
.retryMax(MAX_RETRIES)
.exponentialBackoffWithJitter(FIRST_BACKOFF, MAX_BACKOFF);
return HttpClient
.create()
.headers(h -> {
h.add("Content-Type", "application/json");
h.add("Accept", "application/json");
h.add(SESSION_HEADER_KEY, settings.getSessionId());
h.add(SESSION_TOKEN_HEADER_KEY, settings.getSessionToken());
})
.followRedirect(true)
.compress(settings.getCompress())
.post()
.uri(uri)
.send(ByteBufFlux.fromString(Flux.just(body)))
.responseContent()
.aggregate()
.asString()
.retryWhen(retryStrategy)
.onErrorReturn(error -> error.getCause() instanceof ErrorCodeException, "failure");
}
private String getSubmitUri(Target target) {
return String.format(
"%s?client=%s&target=%s", settings.getSubmitUrl(), CLIENT_APP_NAME, target.toString());
}
private static class ErrorCodeException extends Exception {
@Getter
private final int statusCode;
@Getter
private final String error;
@SuppressWarnings("SameParameterValue")
ErrorCodeException(int statusCode, String error) {
this.statusCode = statusCode;
this.error = error;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment