Skip to content

Instantly share code, notes, and snippets.

@segabriel
Created January 13, 2020 12:19
Show Gist options
  • Save segabriel/b8a6a189ff89a9560a279cd97cc4a8fe to your computer and use it in GitHub Desktop.
Save segabriel/b8a6a189ff89a9560a279cd97cc4a8fe to your computer and use it in GitHub Desktop.
package examples;
import io.netty.buffer.ByteBuf;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetector.Level;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import java.util.Objects;
import java.util.Random;
import java.util.stream.LongStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class Example {
static {
ResourceLeakDetector.setLevel(Level.PARANOID);
}
public static void main(String[] args) {
CloseableChannel server = null;
RSocket client = null;
int garbage = 1000;
try {
for (int j = 0; j < 1000; j++) {
// System.out.println("iteration " + j);
server =
Objects.requireNonNull(
RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.errorConsumer(th -> System.err.println("[Server]: " + th.getMessage()))
.acceptor(new SocketAcceptorImpl())
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.doOnError(th -> System.err.println("[Server bind]: " + th.getMessage()))
.block());
client =
Objects.requireNonNull(
RSocketFactory.connect()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.errorConsumer(th -> System.err.println("[Client]: " + th.getMessage()))
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.doOnError(th -> System.err.println("[Client connect]: " + th.getMessage()))
.block());
for (int i = 0; i < 1000; i++) {
// System.out.println(i);
client
.requestStream(ByteBufPayload.create("data" + i, "metadata" + i))
.doOnNext(
p -> {
ByteBuf data = p.sliceData().retain();
ByteBuf metadata = p.sliceMetadata().retain();
p.release();
data.release();
metadata.release();
})
.blockFirst();
garbage = generateGarbage(garbage);
}
client.dispose();
server.dispose();
client.onClose().block();
server.onClose().block();
}
} catch (Throwable th) {
th.printStackTrace();
if (client != null) {
client.dispose();
}
if (server != null) {
server.dispose();
}
}
}
private static class SocketAcceptorImpl implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
return Mono.just(
new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
// System.out.println(payload.getDataUtf8());
payload.release();
return Flux.defer(() -> Flux.fromStream(LongStream.range(0, 30).boxed()))
.map(i -> ByteBufPayload.create("data" + i, "metadata" + i));
}
});
}
}
public static final Random RANDOM = new Random(42);
private static int generateGarbage(int length) {
Long[][] longs = new Long[RANDOM.nextInt(10) + 1][];
for (int i = 0; i < longs.length; i++) {
longs[i] = new Long[length];
for (int j = 0; j < length; j++) {
longs[i][j] = (long) j;
}
}
return RANDOM.nextLong() == longs[0][RANDOM.nextInt(length)] ? 1024 : 1024 * 100;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment