Last active
December 26, 2018 07:33
-
-
Save pgilad/de5a530a10f8305af21426900e973aa5 to your computer and use it in GitHub Desktop.
DataSender using Netty Reactive Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package data; | |
import lombok.Getter; | |
import lombok.extern.slf4j.Slf4j; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.netty.ByteBufFlux; | |
import reactor.netty.http.client.HttpClient; | |
import reactor.retry.Retry; | |
import settings.Settings; | |
import java.time.Duration; | |
@Slf4j | |
class DataSender { | |
private static final String SESSION_HEADER_KEY = "X-BZM-SESSION"; | |
private static final String SESSION_TOKEN_HEADER_KEY = "X-BZM-SESSION-TOKEN"; | |
private static final Duration FIRST_BACKOFF = Duration.ofMillis(100); | |
private static final Duration MAX_BACKOFF = Duration.ofSeconds(20); | |
private static final String CLIENT_APP_NAME = "jetpack"; | |
private static final int MAX_RETRIES = 8; | |
private static final String REQUEST_FAILED = "Request failed"; | |
private final Settings settings; | |
DataSender(Settings settings) { | |
this.settings = settings; | |
} | |
Mono<String> submitDataWithRetries(String body, Target target) { | |
final String uri = this.getSubmitUri(target); | |
final Retry<Object> retryStrategy = Retry | |
.any() | |
.retryMax(MAX_RETRIES) | |
.exponentialBackoffWithJitter(FIRST_BACKOFF, MAX_BACKOFF); | |
return HttpClient | |
.create() | |
.headers(h -> { | |
h.add("Content-Type", "application/json"); | |
h.add("Accept", "application/json"); | |
h.add(SESSION_HEADER_KEY, settings.getSessionId()); | |
h.add(SESSION_TOKEN_HEADER_KEY, settings.getSessionToken()); | |
}) | |
.followRedirect(true) | |
.compress(settings.getCompress()) | |
.post() | |
.uri(uri) | |
.send(ByteBufFlux.fromString(Flux.just(body))) | |
.responseContent() | |
.aggregate() | |
.asString() | |
.retryWhen(retryStrategy) | |
.onErrorReturn(error -> error.getCause() instanceof ErrorCodeException, "failure"); | |
} | |
private String getSubmitUri(Target target) { | |
return String.format( | |
"%s?client=%s&target=%s", settings.getSubmitUrl(), CLIENT_APP_NAME, target.toString()); | |
} | |
private static class ErrorCodeException extends Exception { | |
@Getter | |
private final int statusCode; | |
@Getter | |
private final String error; | |
@SuppressWarnings("SameParameterValue") | |
ErrorCodeException(int statusCode, String error) { | |
this.statusCode = statusCode; | |
this.error = error; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment