Skip to content

Instantly share code, notes, and snippets.

@krisskross
Created November 28, 2015 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krisskross/cf2fa43c1a7c073f3a12 to your computer and use it in GitHub Desktop.
Save krisskross/cf2fa43c1a7c073f3a12 to your computer and use it in GitHub Desktop.
RxNettyTestWorking
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.server.RxServer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
public class RxNettyTestWorking {
public static void main(String[] args) throws Exception {
RxServer<ByteBuf, ByteBuf> tcpServer = RxNetty.createTcpServer(1111, connection -> {
System.out.println("Connection open");
return connection.getInput()
.flatMap(byteBuf -> connection.writeAndFlush(Unpooled.copiedBuffer(byteBuf)));
}).start();
ObservableConnection<ByteBuf, ByteBuf> connection = RxNetty.createTcpClient("localhost", 1111)
.connect().toBlocking().first();
System.out.println("Client sending");
AtomicInteger count = new AtomicInteger();
Iterator<ByteBuf> byteBufs = connection.getInput().map(b -> Unpooled.copiedBuffer(b))
.toBlocking().toIterable().iterator();
for (int j = 0; j < 100; j++) {
long now = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
ByteBuf msg = Unpooled.buffer().writeInt(count.incrementAndGet());
connection.writeAndFlush(msg).toBlocking().subscribe();
ByteBuf byteBuf = byteBufs.next();
}
System.out.println(System.currentTimeMillis() - now);
}
System.out.println("Done");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment