Skip to content

Instantly share code, notes, and snippets.

@robertroeser
Created February 12, 2019 23:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robertroeser/bbe44318e78ae2a40b1b850bfba55b43 to your computer and use it in GitHub Desktop.
Save robertroeser/bbe44318e78ae2a40b1b850bfba55b43 to your computer and use it in GitHub Desktop.
package io.netifi.rsocket.example;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;
public class Client {
public static void main(String... args) {
RSocket rSocket =
RSocketFactory.connect().transport(TcpClientTransport.create(7878)).start().block();
int count = 1_000_000;
Flux<Payload> map =
Flux.range(1, count)
.map(
i -> DefaultPayload.create(i + " - sending message " + System.currentTimeMillis()));
rSocket
.requestChannel(map)
.doOnNext(
payload -> {
String dataUtf8 = payload.getDataUtf8();
System.out.println(
"got a message [" + dataUtf8 + "] at " + System.currentTimeMillis());
})
.blockLast();
}
}
package io.netifi.rsocket.example;
import io.rsocket.*;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
public class Server {
public static void main(String... args) {
RSocketFactory.receive()
.acceptor(
new SocketAcceptor() {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
return Mono.just(
new AbstractRSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads)
.flatMapSequential(
payload -> {
return Flux.just(payload).delaySequence(Duration.ofMillis(10));
});
}
});
}
})
.transport(TcpServerTransport.create(7878))
.start()
.block()
.onClose()
.block();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment