Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Last active February 1, 2017 23:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NiteshKant/9bf665902815a1209bdda228ebbbd866 to your computer and use it in GitHub Desktop.
Save NiteshKant/9bf665902815a1209bdda228ebbbd866 to your computer and use it in GitHub Desktop.
rs-java-issue-229
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.SetupProvider;
import io.reactivesocket.frame.ByteBufferUtil;
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
public class Client {
public static class Performance {
final String url;
final int count;
final double avgSize;
public Performance(String url, int count, double avgSize) {
super();
this.url = url;
this.count = count;
this.avgSize = avgSize;
}
public String getUrl() {
return url;
}
public int getCount() {
return count;
}
public double getAvgSize() {
return avgSize;
}
@Override
public String toString() {
return "Performance [url=" + url + ", count=" + count + ", avgSize=" + avgSize + "]";
}
}
public static Flowable<Performance> subscribe(ReactiveSocket socket, String request) {
final AtomicLong received = new AtomicLong();
final AtomicLong requested = new AtomicLong();
return Flowable.fromPublisher(
socket.requestSubscription(new PayloadImpl(request)))
.map(payload -> payload.getData())
.map(ByteBufferUtil::toUtf8String)
.doOnRequest(n -> requested.addAndGet(n))
.doOnNext(s -> received.incrementAndGet())
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // move one down to "fix" it
.buffer(128)
.map(l-> {
double avgSize = l
.stream()
.mapToInt(String::length)
.average()
.orElse(0.0);
return new Performance(request, l.size(), avgSize);
})
.doFinally(() -> System.out.println("Requested: " + requested.get() + ", Received: " + received.get()));
}
public static void main(String[] args) throws InterruptedException {
int port = 9000;
String host = "localhost";
SocketAddress address = new InetSocketAddress(host, port);
ReactiveSocket socket = Flowable.fromPublisher(ReactiveSocketClient.create(TcpTransportClient.create(address),
SetupProvider.keepAlive(KeepAliveProvider.never()).disableLease()).connect()).blockingFirst();
for (int i = 0; i < 1; i++) {
subscribe(socket, "localhost:4096:Object"+i)
.forEach(System.out::println);
}
Thread.sleep(10000000);
}
}
#### Server Output
01 Feb 2017 15:02:08,546 INFO [main] - Rx server started at port: 9000
01 Feb 2017 15:02:15,777 INFO [rxnetty-nio-eventloop-1-2] - Creating thread pooled named io.reactivesocket.frame.UnpooledFrame
Got request for [localhost, 4096, Object0]
Requested: 13986. Sent: 13986
#### Client Output
01 Feb 2017 15:02:15,529 INFO [main] - Creating thread pooled named io.reactivesocket.frame.UnpooledFrame
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0]
io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:89)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67)
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94)
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166)
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54)
at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295)
at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75)
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94)
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107)
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71)
at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189)
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-2-thread-1" io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:89)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67)
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94)
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166)
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54)
at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295)
at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75)
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94)
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107)
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71)
at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189)
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
__Requested: 13952, Received: 13910__
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import io.reactivesocket.AbstractReactiveSocket;
import io.reactivesocket.Payload;
import io.reactivesocket.frame.ByteBufferUtil;
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
import org.slf4j.event.Level;
public class Server {
public static void main(String[] args) throws IOException {
int port = 9000;
ReactiveSocketServer.create(TcpTransportServer.create(port))
.start((setupPayload, reactiveSocket) -> {
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() {
@Override
public Publisher<Payload> requestResponse(Payload p) {
return Flowable.just(p);
}
@Override
public Publisher<Payload> requestSubscription(Payload p) {
String[] request = ByteBufferUtil.toUtf8String(p.getData()).split(":");
int size = Integer.parseInt(request[1]);
System.out.println("Got request for " + Arrays.toString(request));
final byte[] buff = new byte[size];
Arrays.fill(buff, (byte)65);
final AtomicLong requested = new AtomicLong();
final AtomicLong sent = new AtomicLong();
Flowable.interval(30, TimeUnit.SECONDS)
.doOnNext(aLong -> System.out.println("Requested: " + requested.get()
+ ". Sent: " + sent.get()))
.ignoreElements()
.ambWith(Flowable.fromPublisher(reactiveSocket.onClose()).ignoreElements())
.doFinally(() -> System.out.println("Requested: " + requested.get()
+ ". Sent: " + sent.get()))
.subscribe();
return Flowable.range(1, Integer.MAX_VALUE)
.doOnRequest(n -> requested.addAndGet(n))
.map(integer -> {
sent.incrementAndGet();
return new PayloadImpl(buff);
});
}
});
});
new BufferedReader(new InputStreamReader(System.in)).readLine();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment