Skip to content

Instantly share code, notes, and snippets.

@robertroeser
Created July 1, 2019 18:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robertroeser/53feaeb2cfb3027680e3e2b803460ab2 to your computer and use it in GitHub Desktop.
Save robertroeser/53feaeb2cfb3027680e3e2b803460ab2 to your computer and use it in GitHub Desktop.
RSocket throttling using request n
package com.netifi.demo;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
public class ClientThrottling {
public static void main(String... args) {
Server server = new Server();
Client client = new Client();
client.getChunks().blockLast();
}
public static class Server extends AbstractRSocket {
public Server() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(Server.this))
.transport(TcpServerTransport.create(9090))
.start()
.block();
}
@Override
public Flux<Payload> requestStream(Payload payload) {
byte[] chunk = new byte[1024];
ThreadLocalRandom.current().nextBytes(chunk);
return Flux.range(1, 1000).map(i -> DefaultPayload.create(chunk));
}
}
public static class Client {
private RSocket rSocket;
public Client() {
rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();
}
public Flux<ByteBuffer> getChunks() {
return rSocket
.requestStream(DefaultPayload.create(DefaultPayload.EMPTY_BUFFER))
.map(Payload::getData)
.limitRate(1)
.delayElements(Duration.ofSeconds(1))
.doOnNext(s -> System.out.println("here ... " + System.currentTimeMillis()));
}
}
}
package com.netifi.demo;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
public class ServerThrottling {
public static void main(String... args) {
Server server = new Server();
Client client = new Client();
client.getChunks(2500).blockLast();
}
public static class Server extends AbstractRSocket {
public Server() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(Server.this))
.transport(TcpServerTransport.create(9090))
.start()
.block();
}
@Override
public Flux<Payload> requestStream(Payload payload) {
byte[] chunk = new byte[1024];
ThreadLocalRandom.current().nextBytes(chunk);
int millis = payload.getData().getInt();
return Flux.range(1, 1000)
.limitRate(1)
.delayElements(Duration.ofMillis(millis))
.map(i -> DefaultPayload.create(chunk));
}
}
public static class Client {
private RSocket rSocket;
public Client() {
rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();
}
public Flux<ByteBuffer> getChunks(int millis) {
ByteBuffer allocate = ByteBuffer.allocate(Integer.BYTES);
allocate.putInt(millis);
allocate.flip();
return rSocket
.requestStream(DefaultPayload.create(allocate))
.map(Payload::getData)
.doOnNext(s -> System.out.println("here ... " + System.currentTimeMillis()));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment