Skip to content

Instantly share code, notes, and snippets.

@robertroeser
Created October 6, 2019 17:04
Show Gist options
  • Save robertroeser/14cbadc21ef26e518fffe4ebcf0fe112 to your computer and use it in GitHub Desktop.
Save robertroeser/14cbadc21ef26e518fffe4ebcf0fe112 to your computer and use it in GitHub Desktop.
Wrap a load balancer with an RSocket
package io.rsocket.client;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class LoadBalancedRSocket extends AbstractRSocket {
private final LoadBalancedRSocketMono lb;
public LoadBalancedRSocket(LoadBalancedRSocketMono lb) {
this.lb = lb;
Disposable disposable = lb.onClose().doFinally(s -> dispose()).subscribe();
onClose()
.doFinally(
s -> {
disposable.dispose();
lb.dispose();
})
.subscribe();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
return lb.flatMap(r -> r.fireAndForget(payload));
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return lb.flatMap(r -> r.requestResponse(payload));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return lb.flatMapMany(r -> r.requestStream(payload));
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return lb.flatMapMany(r -> r.requestChannel(payloads));
}
@Override
public Mono<Void> metadataPush(Payload payload) {
return lb.flatMap(r -> r.metadataPush(payload));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment