Skip to content

Instantly share code, notes, and snippets.

@mostroverkhov
Last active February 12, 2018 22:50
Show Gist options
  • Save mostroverkhov/fc0e6202ea7dd201ab36cbc5600309b7 to your computer and use it in GitHub Desktop.
Save mostroverkhov/fc0e6202ea7dd201ab36cbc5600309b7 to your computer and use it in GitHub Desktop.
package io.rsocket;
import io.rsocket.test.util.TestDuplexConnection;
import io.rsocket.util.DefaultPayload;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
public class RSocketClientSendTest {
private Mono<Payload> request;
@Before
public void setUp() throws Exception {
SendFrameThenErrorConnection conn =
new SendFrameThenErrorConnection();
RSocketClient rSocketClient = new RSocketClient(
conn,
DefaultPayload::create,
Throwable::printStackTrace,
StreamIdSupplier.clientSupplier());
request = rSocketClient
.requestResponse(
DefaultPayload.create("request"));
}
@Test
public void send() throws Exception {
/*first request fails as expected*/
expectError();
}
@Test
public void sendAfterError() throws Exception {
/*first request fails as expected*/
expectError();
/*subsequent requests never complete,
and their associated receivers (and senders for Channel) never removed*/
expectError();
}
private void expectError() {
StepVerifier.create(request)
.expectNextCount(0)
.expectError()
.verify(Duration.ofSeconds(10));
}
private static class SendFrameThenErrorConnection extends TestDuplexConnection {
@Override
public Mono<Void> send(Publisher<Frame> frames) {
return Flux.from(frames)
.take(1)
.then(Mono.error(new RuntimeException("err")));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment