Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mdindoffer/c0e65bd9075ca7bdc8b639ebbf2c0fde to your computer and use it in GitHub Desktop.
Save mdindoffer/c0e65bd9075ca7bdc8b639ebbf2c0fde to your computer and use it in GitHub Desktop.
RSocket Requester & Responder leak
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.test.PingClient;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.HdrHistogram.Recorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
public class RequesterResponderLeakReproducerClient {
private static final Logger LOG = LoggerFactory.getLogger(RequesterResponderLeakReproducerClient.class);
public static void main(String[] args) {
new RequesterResponderLeakReproducerClient().createClientAndExchangePayloadTCP();
}
private void createClientAndExchangePayloadTCP() {
Mono<RSocket> client =
RSocketConnector.create()
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create(7878));
PingClient pingClient = new PingClient(client);
RSocket clientRsocket = client.block();
clientRsocket.onClose().subscribe(null, throwable -> LOG.error("RSocket client throwable {}", throwable), () -> LOG.info("Completed closing of the Client"));
int count = 1_000;
pingClient
.requestResponsePingPong(count, new Recorder(3600000000000L, 3))
.doOnTerminate(() -> LOG.info("Sent {} messages", count))
.blockLast();
clientRsocket.dispose();
clientRsocket.onClose().block();
}
}
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.test.PingHandler;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class RequesterResponderLeakReproducerServer {
private static final Logger LOG = LoggerFactory.getLogger(RequesterResponderLeakReproducerServer.class);
private final Map<String, Integer> subscriptionStats = Collections.synchronizedMap(new HashMap<>());
public static void main(String[] args) throws InterruptedException {
new RequesterResponderLeakReproducerServer().run();
}
private void run() throws InterruptedException {
registerReactorHooks();
createAndStartServerTCP();
while (true) {
System.gc();
printSubscriptionStats();
Thread.sleep(2000);
}
}
private CloseableChannel createAndStartServerTCP() {
return RSocketServer.create(new PingHandler())
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(7878))
.block();
}
private void printSubscriptionStats() {
LOG.info("Active Reactor subscriptions: \n{}",
subscriptionStats.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Object::toString)
.collect(Collectors.joining("\n")));
}
private void registerReactorHooks() {
Hooks.onLastOperator(objectPublisher -> {
Optional<String> callerFrameLoc = StackWalker.getInstance()
.walk(stackFrameStream -> stackFrameStream
.filter(stackFrame -> !stackFrame.getClassName().startsWith("java."))
.filter(stackFrame -> !stackFrame.getClassName().contains(this.getClass().getName()))
.filter(stackFrame -> !stackFrame.getClassName().startsWith("reactor."))
.map(Object::toString)
.findFirst());
callerFrameLoc.ifPresent(stackFrame -> subscriptionStats.merge(stackFrame, 1, Integer::sum));
if (objectPublisher instanceof Mono) {
return Hooks.convertToMonoBypassingHooks(objectPublisher, false)
.doFinally(signalType -> callerFrameLoc.ifPresent(this::unregisterSubscriptionFrame));
} else if (objectPublisher instanceof Flux) {
return Hooks.convertToFluxBypassingHooks(objectPublisher)
.doFinally(signalType -> callerFrameLoc.ifPresent(this::unregisterSubscriptionFrame));
} else {
return objectPublisher;
}
});
}
private void unregisterSubscriptionFrame(String callerFrame) {
subscriptionStats.merge(callerFrame, 1, (integer, integer2) -> {
int out = integer - integer2;
if (out == 0) {
return null;
} else {
return out;
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment