Skip to content

Instantly share code, notes, and snippets.

@Sm0keySa1m0n
Created April 26, 2021 09:32
Show Gist options
  • Save Sm0keySa1m0n/bb33abcb32adf0ae9c26d6f640420905 to your computer and use it in GitHub Desktop.
Save Sm0keySa1m0n/bb33abcb32adf0ae9c26d6f640420905 to your computer and use it in GitHub Desktop.
RSocket ByteBuf corruption
import java.io.IOException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.ByteBufPayload;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class Test {
private static Payload encodeConnectionSetup() {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
System.out.println("[" + Thread.currentThread().getName() + "] " + "Writing a 5");
byteBuf.writeByte(0x05);
return ByteBufPayload.create(byteBuf);
}
private static void decodeConnectionSetup(ByteBuf byteBuf) {
if (byteBuf.isReadable()) {
System.out.println(
"[" + Thread.currentThread().getName() + "] " + "Reading a " + byteBuf.readByte());
}
}
// ========================================
public static void main(String[] args) throws IOException {
RSocketServer
.create((payload, socket) -> Mono.fromRunnable(() -> decodeConnectionSetup(payload.data()))
.then().cast(RSocket.class))
.bind(WebsocketServerTransport.create(25565)).subscribeOn(Schedulers.parallel())
.subscribe();
connect().flatMap(__ -> connect()).block();
}
private static Mono<RSocket> connect() {
return RSocketConnector.create().setupPayload(encodeConnectionSetup())
.connect(WebsocketClientTransport.create(25565));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment