Last active
January 27, 2017 14:36
-
-
Save rkuhn/e450772db7ecc0c52be7e3c78d528973 to your computer and use it in GitHub Desktop.
demo of missing backpressure propagation in ReactiveSocket Java (Jan 27, 2017)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.rolandkuhn.rs; | |
import java.net.InetSocketAddress; | |
import java.net.SocketAddress; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import io.reactivesocket.ReactiveSocket; | |
import io.reactivesocket.client.KeepAliveProvider; | |
import io.reactivesocket.client.ReactiveSocketClient; | |
import io.reactivesocket.client.SetupProvider; | |
import io.reactivesocket.frame.ByteBufferUtil; | |
import io.reactivesocket.transport.tcp.client.TcpTransportClient; | |
import io.reactivesocket.util.PayloadImpl; | |
import io.reactivex.Flowable; | |
import io.reactivex.schedulers.Schedulers; | |
public class Client { | |
public static class Performance { | |
final String url; | |
final int count; | |
final double avgSize; | |
public Performance(String url, int count, double avgSize) { | |
super(); | |
this.url = url; | |
this.count = count; | |
this.avgSize = avgSize; | |
} | |
public String getUrl() { | |
return url; | |
} | |
public int getCount() { | |
return count; | |
} | |
public double getAvgSize() { | |
return avgSize; | |
} | |
@Override | |
public String toString() { | |
return "Performance [url=" + url + ", count=" + count + ", avgSize=" + avgSize + "]"; | |
} | |
} | |
public static Flowable<Performance> subscribe(ReactiveSocket socket, String request) { | |
return Flowable.fromPublisher( | |
socket.requestSubscription(new PayloadImpl(request))) | |
.map(payload -> payload.getData()) | |
.map(ByteBufferUtil::toUtf8String) | |
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // move one down to "fix" it | |
.buffer(1, TimeUnit.SECONDS) | |
.map(l-> { | |
double avgSize = l | |
.stream() | |
.mapToInt(String::length) | |
.average() | |
.orElse(0.0); | |
return new Performance(request, l.size(), avgSize); | |
}); | |
} | |
public static void main(String[] args) { | |
int port = 9000; | |
String host = "localhost"; | |
SocketAddress address = new InetSocketAddress(host, port); | |
ReactiveSocket socket = Flowable.fromPublisher(ReactiveSocketClient.create(TcpTransportClient.create(address), | |
SetupProvider.keepAlive(KeepAliveProvider.never()).disableLease()).connect()).blockingFirst(); | |
for (int i = 0; i < 1; i++) { | |
subscribe(socket, "localhost:4096:Object"+i) | |
.forEach(System.out::println); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
io.reactivex.exceptions.MissingBackpressureException: Queue is full?! | |
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113) | |
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166) | |
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54) | |
at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295) | |
at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232) | |
at io.reactivesocket.ClientReactiveSocket$$Lambda$28/1913632409.accept(Unknown Source) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$$Lambda$29/347586394.accept(Unknown Source) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75) | |
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107) | |
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71) | |
at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107) | |
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134) | |
at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57) | |
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) | |
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) | |
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373) | |
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189) | |
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) | |
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) | |
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) | |
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) | |
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) | |
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465) | |
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) | |
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) | |
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) | |
at java.lang.Thread.run(Thread.java:745) | |
Exception in thread "pool-2-thread-1" io.reactivex.exceptions.MissingBackpressureException: Queue is full?! | |
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113) | |
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166) | |
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54) | |
at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295) | |
at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232) | |
at io.reactivesocket.ClientReactiveSocket$$Lambda$28/1913632409.accept(Unknown Source) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$$Lambda$29/347586394.accept(Unknown Source) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75) | |
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107) | |
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71) | |
at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107) | |
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134) | |
at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57) | |
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) | |
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) | |
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373) | |
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189) | |
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) | |
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) | |
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) | |
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) | |
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) | |
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465) | |
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) | |
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) | |
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) | |
at java.lang.Thread.run(Thread.java:745) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.rolandkuhn.rs; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.util.Arrays; | |
import org.reactivestreams.Publisher; | |
import io.reactivesocket.AbstractReactiveSocket; | |
import io.reactivesocket.Payload; | |
import io.reactivesocket.frame.ByteBufferUtil; | |
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket; | |
import io.reactivesocket.server.ReactiveSocketServer; | |
import io.reactivesocket.transport.tcp.server.TcpTransportServer; | |
import io.reactivesocket.util.PayloadImpl; | |
import io.reactivex.Flowable; | |
public class Server { | |
public static void main(String[] args) throws IOException { | |
int port = 9000; | |
ReactiveSocketServer.create(TcpTransportServer.create(port)) | |
.start((setupPayload, reactiveSocket) -> { | |
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { | |
@Override | |
public Publisher<Payload> requestResponse(Payload p) { | |
return Flowable.just(p); | |
} | |
@Override | |
public Publisher<Payload> requestSubscription(Payload p) { | |
String[] request = ByteBufferUtil.toUtf8String(p.getData()).split(":"); | |
int size = Integer.parseInt(request[1]); | |
System.out.println("Got request for " + request); | |
final byte[] buff = new byte[size]; | |
Arrays.fill(buff, (byte)65); | |
return Flowable.generate(emitter -> emitter.onNext(new PayloadImpl(buff))); | |
} | |
}); | |
}); | |
new BufferedReader(new InputStreamReader(System.in)).readLine(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 8acc0f005daba60dd1d8be2e8eb3e8d9daf640f2 | |
Author: Nitesh Kant <nitesh1706@gmail.com> | |
Date: Thu Jan 26 21:56:09 2017 -0800 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment