Created
July 1, 2019 18:06
-
-
Save robertroeser/53feaeb2cfb3027680e3e2b803460ab2 to your computer and use it in GitHub Desktop.
RSocket throttling using request n
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netifi.demo; | |
import io.rsocket.AbstractRSocket; | |
import io.rsocket.Payload; | |
import io.rsocket.RSocket; | |
import io.rsocket.RSocketFactory; | |
import io.rsocket.transport.netty.client.TcpClientTransport; | |
import io.rsocket.transport.netty.server.TcpServerTransport; | |
import io.rsocket.util.DefaultPayload; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import java.nio.ByteBuffer; | |
import java.time.Duration; | |
import java.util.concurrent.ThreadLocalRandom; | |
public class ClientThrottling { | |
public static void main(String... args) { | |
Server server = new Server(); | |
Client client = new Client(); | |
client.getChunks().blockLast(); | |
} | |
public static class Server extends AbstractRSocket { | |
public Server() { | |
RSocketFactory.receive() | |
.acceptor((setup, sendingSocket) -> Mono.just(Server.this)) | |
.transport(TcpServerTransport.create(9090)) | |
.start() | |
.block(); | |
} | |
@Override | |
public Flux<Payload> requestStream(Payload payload) { | |
byte[] chunk = new byte[1024]; | |
ThreadLocalRandom.current().nextBytes(chunk); | |
return Flux.range(1, 1000).map(i -> DefaultPayload.create(chunk)); | |
} | |
} | |
public static class Client { | |
private RSocket rSocket; | |
public Client() { | |
rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block(); | |
} | |
public Flux<ByteBuffer> getChunks() { | |
return rSocket | |
.requestStream(DefaultPayload.create(DefaultPayload.EMPTY_BUFFER)) | |
.map(Payload::getData) | |
.limitRate(1) | |
.delayElements(Duration.ofSeconds(1)) | |
.doOnNext(s -> System.out.println("here ... " + System.currentTimeMillis())); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netifi.demo; | |
import io.rsocket.AbstractRSocket; | |
import io.rsocket.Payload; | |
import io.rsocket.RSocket; | |
import io.rsocket.RSocketFactory; | |
import io.rsocket.transport.netty.client.TcpClientTransport; | |
import io.rsocket.transport.netty.server.TcpServerTransport; | |
import io.rsocket.util.DefaultPayload; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import java.nio.ByteBuffer; | |
import java.time.Duration; | |
import java.util.concurrent.ThreadLocalRandom; | |
public class ServerThrottling { | |
public static void main(String... args) { | |
Server server = new Server(); | |
Client client = new Client(); | |
client.getChunks(2500).blockLast(); | |
} | |
public static class Server extends AbstractRSocket { | |
public Server() { | |
RSocketFactory.receive() | |
.acceptor((setup, sendingSocket) -> Mono.just(Server.this)) | |
.transport(TcpServerTransport.create(9090)) | |
.start() | |
.block(); | |
} | |
@Override | |
public Flux<Payload> requestStream(Payload payload) { | |
byte[] chunk = new byte[1024]; | |
ThreadLocalRandom.current().nextBytes(chunk); | |
int millis = payload.getData().getInt(); | |
return Flux.range(1, 1000) | |
.limitRate(1) | |
.delayElements(Duration.ofMillis(millis)) | |
.map(i -> DefaultPayload.create(chunk)); | |
} | |
} | |
public static class Client { | |
private RSocket rSocket; | |
public Client() { | |
rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block(); | |
} | |
public Flux<ByteBuffer> getChunks(int millis) { | |
ByteBuffer allocate = ByteBuffer.allocate(Integer.BYTES); | |
allocate.putInt(millis); | |
allocate.flip(); | |
return rSocket | |
.requestStream(DefaultPayload.create(allocate)) | |
.map(Payload::getData) | |
.doOnNext(s -> System.out.println("here ... " + System.currentTimeMillis())); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment