Skip to content

Instantly share code, notes, and snippets.

@ilterpehlivan
Created June 6, 2020 03:45
Show Gist options
  • Save ilterpehlivan/92e1121998e3678e74ad0556fb9018ff to your computer and use it in GitHub Desktop.
Save ilterpehlivan/92e1121998e3678e74ad0556fb9018ff to your computer and use it in GitHub Desktop.
Loadbalancedmono signaltype issue
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
repositories {
maven { url 'https://oss.jfrog.org/oss-snapshot-local' }
}
}
dependencies {
compile platform('io.rsocket:rsocket-bom:1.0.0')
compile group: 'io.rsocket', name: 'rsocket-core'
compile group: 'io.rsocket', name: 'rsocket-transport-netty'
compile group: 'io.rsocket', name: 'rsocket-load-balancer'
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
compileOnly 'org.projectlombok:lombok:1.18.12'
annotationProcessor 'org.projectlombok:lombok:1.18.12'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
package org.example;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.client.LoadBalancedRSocketMono;
import io.rsocket.client.filter.RSocketSupplier;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
RSocketServer.create(
SocketAcceptor.with(
new RSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just(DefaultPayload.create("Hello server"));
}
}))
.interceptors(registry -> registry.forResponder(new TestInterceptor()))
.bind(TcpServerTransport.create("localhost", 7000))
.subscribe();
CountDownLatch rsocketInit = new CountDownLatch(1);
Mono<RSocket> rSocketMono =
RSocketConnector.create()
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.interceptors(registry -> registry.forRequester(new TestInterceptor()))
.connect(TcpClientTransport.create("localhost", 7000))
.doOnSuccess(
rSocket -> {
log.info("connected to server successfully {}", rSocket.availability());
rsocketInit.countDown();
})
.doOnSubscribe(s -> log.info("trying to connect service {} ", "localhost:7000"));
;
LoadBalancedRSocketMono loadBalancedRSocketMono =
LoadBalancedRSocketMono.create(
Flux.just(Collections.singleton(new RSocketSupplier(() -> rSocketMono))),
LoadBalancedRSocketMono.DEFAULT_EXP_FACTOR,
LoadBalancedRSocketMono.DEFAULT_LOWER_QUANTILE,
LoadBalancedRSocketMono.DEFAULT_HIGHER_QUANTILE,
LoadBalancedRSocketMono.DEFAULT_MIN_PENDING,
LoadBalancedRSocketMono.DEFAULT_MAX_PENDING,
LoadBalancedRSocketMono.DEFAULT_MIN_APERTURE,
LoadBalancedRSocketMono.DEFAULT_MAX_APERTURE,
LoadBalancedRSocketMono.DEFAULT_MAX_REFRESH_PERIOD_MS,
5,
// TODO: make them configurable as well
Duration.ofMillis(300),
Duration.ofSeconds(5));
rsocketInit.await();
log.info("\n\nStart of requestResponse interaction\n" + "----------------------------------\n");
loadBalancedRSocketMono
.flatMap(
socket ->
socket
.requestResponse(DefaultPayload.create("Hello Client"))
.doOnRequest(e -> log.debug("Client sends requestN(" + e + ")"))
.map(Payload::getDataUtf8)
.doOnNext(log::info))
.retry()
.doOnError(
er -> {
log.error("error ", er);
})
.block();
log.info("\n\nFinished\n" + "-----------------------------------\n");
}
}
package org.example;
import io.rsocket.RSocket;
import io.rsocket.plugins.RSocketInterceptor;
public class TestInterceptor implements RSocketInterceptor {
@Override
public RSocket apply(RSocket rSocket) {
return new TestRsocket(rSocket);
}
}
package org.example;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class TestRsocket implements RSocket {
private final RSocket delegate;
public TestRsocket(RSocket delegate) {
this.delegate = delegate;
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
return null;
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
log.info("iniside the interceptor requestresponse");
return delegate.requestResponse(payload)
.doOnCancel(() -> log.info("it is cancelled"))
.doFinally(signalType -> log.info("doFinally: signal {}",signalType));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return null;
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment