Skip to content

Instantly share code, notes, and snippets.

@artem-v
Created February 27, 2019 13:42
Show Gist options
  • Save artem-v/5f0ec7cf417bf733177262ff181f61b2 to your computer and use it in GitHub Desktop.
Save artem-v/5f0ec7cf417bf733177262ff181f61b2 to your computer and use it in GitHub Desktop.
RSocketNetty-ThroughputTest
public interface Configurations {
int FRAGMENT_COUNT_LIMIT = Integer.getInteger("reactor.aeron.sample.frameCountLimit", 10);
int MESSAGE_LENGTH = Integer.getInteger("reactor.aeron.sample.messageLength", 32);
int WARMUP_NUMBER_OF_ITERATIONS = Integer.getInteger("reactor.aeron.sample.warmup.iterations", 5);
long WARMUP_NUMBER_OF_MESSAGES = Long.getLong("reactor.aeron.sample.warmup.messages", 10_000);
long NUMBER_OF_MESSAGES = Long.getLong("reactor.aeron.sample.messages", 100_000_000);
boolean EXCLUSIVE_PUBLICATIONS =
Boolean.getBoolean("reactor.aeron.sample.exclusive.publications");
boolean EMBEDDED_MEDIA_DRIVER = Boolean.getBoolean("reactor.aeron.sample.embeddedMediaDriver");
boolean INFO_FLAG = Boolean.getBoolean("reactor.aeron.sample.info");
int PING_STREAM_ID = Integer.getInteger("reactor.aeron.sample.ping.streamId", 10);
int PONG_STREAM_ID = Integer.getInteger("reactor.aeron.sample.pong.streamId", 10);
String PING_CHANNEL =
System.getProperty("reactor.aeron.sample.ping.channel", "aeron:udp?endpoint=localhost:40123");
String PONG_CHANNEL =
System.getProperty("reactor.aeron.sample.pong.channel", "aeron:udp?endpoint=localhost:40124");
String MDC_ADDRESS = System.getProperty("reactor.aeron.sample.mdc.address", "localhost");
int MDC_PORT = Integer.getInteger("reactor.aeron.sample.mdc.port", 13000);
int MDC_CONTROL_PORT = Integer.getInteger("reactor.aeron.sample.mdc.control.port", 13001);
int MDC_STREAM_ID = Integer.getInteger("reactor.aeron.sample.mdc.stream.id", 0xcafe0000);
int MDC_SESSION_ID = Integer.getInteger("reactor.aeron.sample.mdc.session.id", 1001);
int REQUESTED = Integer.getInteger("reactor.aeron.sample.request", 8);
String IDLE_STRATEGY = System.getProperty("reactor.aeron.sample.idle.strategy", "busyspin");
long REPORT_INTERVAL = Long.getLong("reactor.aeron.sample.report.interval", 1);
long WARMUP_REPORT_DELAY = Long.getLong("reactor.aeron.sample.report.delay", REPORT_INTERVAL);
}
import io.netty.channel.ChannelOption;
import io.rsocket.Frame;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.ByteBufPayload;
import reactor.aeron.Configurations;
import reactor.aeron.RateReporter;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
public final class RSocketNettyClientTps {
/**
* Main runner.
*
* @param args program arguments.
*/
public static void main(String... args) {
System.out.println(
"message size: "
+ Configurations.MESSAGE_LENGTH
+ ", number of messages: "
+ Configurations.NUMBER_OF_MESSAGES
+ ", address: "
+ Configurations.MDC_ADDRESS
+ ", port: "
+ Configurations.MDC_PORT);
LoopResources loopResources = LoopResources.create("rsocket-netty");
TcpClient tcpClient =
TcpClient.create(ConnectionProvider.newConnection())
.runOn(loopResources)
.host(Configurations.MDC_ADDRESS)
.port(Configurations.MDC_PORT)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.doOnConnected(System.out::println);
RSocket client =
RSocketFactory.connect()
.frameDecoder(Frame::retain)
.transport(() -> TcpClientTransport.create(tcpClient))
.start()
.doOnSuccess(System.out::println)
.block();
RateReporter reporter = new RateReporter();
Payload request = ByteBufPayload.create("hello");
client
.requestStream(request)
.doOnNext(
payload -> {
reporter.onMessage(1, payload.sliceData().readableBytes());
payload.release();
})
.doOnError(Throwable::printStackTrace)
.doFinally(s -> reporter.dispose())
.then()
.block();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.rsocket.AbstractRSocket;
import io.rsocket.Frame;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import java.util.Random;
import reactor.aeron.Configurations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;
public final class RSocketNettyServerTps {
private static final ByteBuf BUFFER =
ByteBufAllocator.DEFAULT.buffer(Configurations.MESSAGE_LENGTH);
static {
Random random = new Random(System.nanoTime());
byte[] bytes = new byte[Configurations.MESSAGE_LENGTH];
random.nextBytes(bytes);
BUFFER.writeBytes(bytes);
}
/**
* Main runner.
*
* @param args program arguments.
*/
public static void main(String... args) {
System.out.println(
"message size: "
+ Configurations.MESSAGE_LENGTH
+ ", number of messages: "
+ Configurations.NUMBER_OF_MESSAGES
+ ", address: "
+ Configurations.MDC_ADDRESS
+ ", port: "
+ Configurations.MDC_PORT);
LoopResources loopResources = LoopResources.create("rsocket-netty");
TcpServer tcpServer =
TcpServer.create()
.runOn(loopResources)
.host(Configurations.MDC_ADDRESS)
.port(Configurations.MDC_PORT)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.doOnConnection(System.out::println);
RSocketFactory.receive()
.frameDecoder(Frame::retain)
.acceptor(
(setupPayload, rsocket) -> {
System.out.println(rsocket);
return Mono.just(
new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
long msgNum = Configurations.NUMBER_OF_MESSAGES;
System.out.println("streaming " + msgNum + " messages ...");
// Callable<Payload> payloadCallable =
// () -> ByteBufPayload.create(BUFFER.retainedSlice());
// return Mono.fromCallable(payloadCallable)
// .subscribeOn(Schedulers.parallel())
// .repeat(msgNum);
return Flux.range(1, (int) msgNum)
.map(i -> ByteBufPayload.create(BUFFER.retainedSlice()))
.subscribeOn(Schedulers.parallel());
}
});
})
.transport(() -> TcpServerTransport.create(tcpServer))
.start()
.block()
.onClose()
.block();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment