Skip to content

Instantly share code, notes, and snippets.



Last active Dec 26, 2018
What would you like to do?
DataSender using Netty Reactive Client
package data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
import reactor.retry.Retry;
import settings.Settings;
import java.time.Duration;
class DataSender {
private static final String SESSION_HEADER_KEY = "X-BZM-SESSION";
private static final String SESSION_TOKEN_HEADER_KEY = "X-BZM-SESSION-TOKEN";
private static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_BACKOFF = Duration.ofSeconds(20);
private static final String CLIENT_APP_NAME = "jetpack";
private static final int MAX_RETRIES = 8;
private static final String REQUEST_FAILED = "Request failed";
private final Settings settings;
DataSender(Settings settings) {
this.settings = settings;
Mono<String> submitDataWithRetries(String body, Target target) {
final String uri = this.getSubmitUri(target);
final Retry<Object> retryStrategy = Retry
.exponentialBackoffWithJitter(FIRST_BACKOFF, MAX_BACKOFF);
return HttpClient
.headers(h -> {
h.add("Content-Type", "application/json");
h.add("Accept", "application/json");
h.add(SESSION_HEADER_KEY, settings.getSessionId());
h.add(SESSION_TOKEN_HEADER_KEY, settings.getSessionToken());
.onErrorReturn(error -> error.getCause() instanceof ErrorCodeException, "failure");
private String getSubmitUri(Target target) {
return String.format(
"%s?client=%s&target=%s", settings.getSubmitUrl(), CLIENT_APP_NAME, target.toString());
private static class ErrorCodeException extends Exception {
private final int statusCode;
private final String error;
ErrorCodeException(int statusCode, String error) {
this.statusCode = statusCode;
this.error = error;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.