Created
February 27, 2019 13:42
-
-
Save artem-v/5f0ec7cf417bf733177262ff181f61b2 to your computer and use it in GitHub Desktop.
RSocketNetty-ThroughputTest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface Configurations { | |
int FRAGMENT_COUNT_LIMIT = Integer.getInteger("reactor.aeron.sample.frameCountLimit", 10); | |
int MESSAGE_LENGTH = Integer.getInteger("reactor.aeron.sample.messageLength", 32); | |
int WARMUP_NUMBER_OF_ITERATIONS = Integer.getInteger("reactor.aeron.sample.warmup.iterations", 5); | |
long WARMUP_NUMBER_OF_MESSAGES = Long.getLong("reactor.aeron.sample.warmup.messages", 10_000); | |
long NUMBER_OF_MESSAGES = Long.getLong("reactor.aeron.sample.messages", 100_000_000); | |
boolean EXCLUSIVE_PUBLICATIONS = | |
Boolean.getBoolean("reactor.aeron.sample.exclusive.publications"); | |
boolean EMBEDDED_MEDIA_DRIVER = Boolean.getBoolean("reactor.aeron.sample.embeddedMediaDriver"); | |
boolean INFO_FLAG = Boolean.getBoolean("reactor.aeron.sample.info"); | |
int PING_STREAM_ID = Integer.getInteger("reactor.aeron.sample.ping.streamId", 10); | |
int PONG_STREAM_ID = Integer.getInteger("reactor.aeron.sample.pong.streamId", 10); | |
String PING_CHANNEL = | |
System.getProperty("reactor.aeron.sample.ping.channel", "aeron:udp?endpoint=localhost:40123"); | |
String PONG_CHANNEL = | |
System.getProperty("reactor.aeron.sample.pong.channel", "aeron:udp?endpoint=localhost:40124"); | |
String MDC_ADDRESS = System.getProperty("reactor.aeron.sample.mdc.address", "localhost"); | |
int MDC_PORT = Integer.getInteger("reactor.aeron.sample.mdc.port", 13000); | |
int MDC_CONTROL_PORT = Integer.getInteger("reactor.aeron.sample.mdc.control.port", 13001); | |
int MDC_STREAM_ID = Integer.getInteger("reactor.aeron.sample.mdc.stream.id", 0xcafe0000); | |
int MDC_SESSION_ID = Integer.getInteger("reactor.aeron.sample.mdc.session.id", 1001); | |
int REQUESTED = Integer.getInteger("reactor.aeron.sample.request", 8); | |
String IDLE_STRATEGY = System.getProperty("reactor.aeron.sample.idle.strategy", "busyspin"); | |
long REPORT_INTERVAL = Long.getLong("reactor.aeron.sample.report.interval", 1); | |
long WARMUP_REPORT_DELAY = Long.getLong("reactor.aeron.sample.report.delay", REPORT_INTERVAL); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.channel.ChannelOption; | |
import io.rsocket.Frame; | |
import io.rsocket.Payload; | |
import io.rsocket.RSocket; | |
import io.rsocket.RSocketFactory; | |
import io.rsocket.transport.netty.client.TcpClientTransport; | |
import io.rsocket.util.ByteBufPayload; | |
import reactor.aeron.Configurations; | |
import reactor.aeron.RateReporter; | |
import reactor.netty.resources.ConnectionProvider; | |
import reactor.netty.resources.LoopResources; | |
import reactor.netty.tcp.TcpClient; | |
public final class RSocketNettyClientTps { | |
/** | |
* Main runner. | |
* | |
* @param args program arguments. | |
*/ | |
public static void main(String... args) { | |
System.out.println( | |
"message size: " | |
+ Configurations.MESSAGE_LENGTH | |
+ ", number of messages: " | |
+ Configurations.NUMBER_OF_MESSAGES | |
+ ", address: " | |
+ Configurations.MDC_ADDRESS | |
+ ", port: " | |
+ Configurations.MDC_PORT); | |
LoopResources loopResources = LoopResources.create("rsocket-netty"); | |
TcpClient tcpClient = | |
TcpClient.create(ConnectionProvider.newConnection()) | |
.runOn(loopResources) | |
.host(Configurations.MDC_ADDRESS) | |
.port(Configurations.MDC_PORT) | |
.option(ChannelOption.TCP_NODELAY, true) | |
.option(ChannelOption.SO_KEEPALIVE, true) | |
.option(ChannelOption.SO_REUSEADDR, true) | |
.doOnConnected(System.out::println); | |
RSocket client = | |
RSocketFactory.connect() | |
.frameDecoder(Frame::retain) | |
.transport(() -> TcpClientTransport.create(tcpClient)) | |
.start() | |
.doOnSuccess(System.out::println) | |
.block(); | |
RateReporter reporter = new RateReporter(); | |
Payload request = ByteBufPayload.create("hello"); | |
client | |
.requestStream(request) | |
.doOnNext( | |
payload -> { | |
reporter.onMessage(1, payload.sliceData().readableBytes()); | |
payload.release(); | |
}) | |
.doOnError(Throwable::printStackTrace) | |
.doFinally(s -> reporter.dispose()) | |
.then() | |
.block(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.ByteBufAllocator; | |
import io.netty.channel.ChannelOption; | |
import io.rsocket.AbstractRSocket; | |
import io.rsocket.Frame; | |
import io.rsocket.Payload; | |
import io.rsocket.RSocketFactory; | |
import io.rsocket.transport.netty.server.TcpServerTransport; | |
import io.rsocket.util.ByteBufPayload; | |
import java.util.Random; | |
import reactor.aeron.Configurations; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.scheduler.Schedulers; | |
import reactor.netty.resources.LoopResources; | |
import reactor.netty.tcp.TcpServer; | |
public final class RSocketNettyServerTps { | |
private static final ByteBuf BUFFER = | |
ByteBufAllocator.DEFAULT.buffer(Configurations.MESSAGE_LENGTH); | |
static { | |
Random random = new Random(System.nanoTime()); | |
byte[] bytes = new byte[Configurations.MESSAGE_LENGTH]; | |
random.nextBytes(bytes); | |
BUFFER.writeBytes(bytes); | |
} | |
/** | |
* Main runner. | |
* | |
* @param args program arguments. | |
*/ | |
public static void main(String... args) { | |
System.out.println( | |
"message size: " | |
+ Configurations.MESSAGE_LENGTH | |
+ ", number of messages: " | |
+ Configurations.NUMBER_OF_MESSAGES | |
+ ", address: " | |
+ Configurations.MDC_ADDRESS | |
+ ", port: " | |
+ Configurations.MDC_PORT); | |
LoopResources loopResources = LoopResources.create("rsocket-netty"); | |
TcpServer tcpServer = | |
TcpServer.create() | |
.runOn(loopResources) | |
.host(Configurations.MDC_ADDRESS) | |
.port(Configurations.MDC_PORT) | |
.option(ChannelOption.TCP_NODELAY, true) | |
.option(ChannelOption.SO_KEEPALIVE, true) | |
.option(ChannelOption.SO_REUSEADDR, true) | |
.doOnConnection(System.out::println); | |
RSocketFactory.receive() | |
.frameDecoder(Frame::retain) | |
.acceptor( | |
(setupPayload, rsocket) -> { | |
System.out.println(rsocket); | |
return Mono.just( | |
new AbstractRSocket() { | |
@Override | |
public Flux<Payload> requestStream(Payload payload) { | |
payload.release(); | |
long msgNum = Configurations.NUMBER_OF_MESSAGES; | |
System.out.println("streaming " + msgNum + " messages ..."); | |
// Callable<Payload> payloadCallable = | |
// () -> ByteBufPayload.create(BUFFER.retainedSlice()); | |
// return Mono.fromCallable(payloadCallable) | |
// .subscribeOn(Schedulers.parallel()) | |
// .repeat(msgNum); | |
return Flux.range(1, (int) msgNum) | |
.map(i -> ByteBufPayload.create(BUFFER.retainedSlice())) | |
.subscribeOn(Schedulers.parallel()); | |
} | |
}); | |
}) | |
.transport(() -> TcpServerTransport.create(tcpServer)) | |
.start() | |
.block() | |
.onClose() | |
.block(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment