Skip to content

Instantly share code, notes, and snippets.

@rkuhn
Last active January 27, 2017 14:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkuhn/e450772db7ecc0c52be7e3c78d528973 to your computer and use it in GitHub Desktop.
Save rkuhn/e450772db7ecc0c52be7e3c78d528973 to your computer and use it in GitHub Desktop.
demo of missing backpressure propagation in ReactiveSocket Java (Jan 27, 2017)
package com.rolandkuhn.rs;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.SetupProvider;
import io.reactivesocket.frame.ByteBufferUtil;
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class Client {
public static class Performance {
final String url;
final int count;
final double avgSize;
public Performance(String url, int count, double avgSize) {
super();
this.url = url;
this.count = count;
this.avgSize = avgSize;
}
public String getUrl() {
return url;
}
public int getCount() {
return count;
}
public double getAvgSize() {
return avgSize;
}
@Override
public String toString() {
return "Performance [url=" + url + ", count=" + count + ", avgSize=" + avgSize + "]";
}
}
public static Flowable<Performance> subscribe(ReactiveSocket socket, String request) {
return Flowable.fromPublisher(
socket.requestSubscription(new PayloadImpl(request)))
.map(payload -> payload.getData())
.map(ByteBufferUtil::toUtf8String)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // move one down to "fix" it
.buffer(1, TimeUnit.SECONDS)
.map(l-> {
double avgSize = l
.stream()
.mapToInt(String::length)
.average()
.orElse(0.0);
return new Performance(request, l.size(), avgSize);
});
}
public static void main(String[] args) {
int port = 9000;
String host = "localhost";
SocketAddress address = new InetSocketAddress(host, port);
ReactiveSocket socket = Flowable.fromPublisher(ReactiveSocketClient.create(TcpTransportClient.create(address),
SetupProvider.keepAlive(KeepAliveProvider.never()).disableLease()).connect()).blockingFirst();
for (int i = 0; i < 1; i++) {
subscribe(socket, "localhost:4096:Object"+i)
.forEach(System.out::println);
}
}
}
io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67)
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94)
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166)
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54)
at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295)
at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232)
at io.reactivesocket.ClientReactiveSocket$$Lambda$28/1913632409.accept(Unknown Source)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$$Lambda$29/347586394.accept(Unknown Source)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75)
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94)
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107)
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71)
at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189)
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-2-thread-1" io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67)
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94)
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166)
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54)
at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295)
at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232)
at io.reactivesocket.ClientReactiveSocket$$Lambda$28/1913632409.accept(Unknown Source)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$$Lambda$29/347586394.accept(Unknown Source)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73)
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75)
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94)
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107)
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71)
at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189)
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
package com.rolandkuhn.rs;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import org.reactivestreams.Publisher;
import io.reactivesocket.AbstractReactiveSocket;
import io.reactivesocket.Payload;
import io.reactivesocket.frame.ByteBufferUtil;
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
public class Server {
public static void main(String[] args) throws IOException {
int port = 9000;
ReactiveSocketServer.create(TcpTransportServer.create(port))
.start((setupPayload, reactiveSocket) -> {
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() {
@Override
public Publisher<Payload> requestResponse(Payload p) {
return Flowable.just(p);
}
@Override
public Publisher<Payload> requestSubscription(Payload p) {
String[] request = ByteBufferUtil.toUtf8String(p.getData()).split(":");
int size = Integer.parseInt(request[1]);
System.out.println("Got request for " + request);
final byte[] buff = new byte[size];
Arrays.fill(buff, (byte)65);
return Flowable.generate(emitter -> emitter.onNext(new PayloadImpl(buff)));
}
});
});
new BufferedReader(new InputStreamReader(System.in)).readLine();
}
}
commit 8acc0f005daba60dd1d8be2e8eb3e8d9daf640f2
Author: Nitesh Kant <nitesh1706@gmail.com>
Date: Thu Jan 26 21:56:09 2017 -0800
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment