Skip to content

Instantly share code, notes, and snippets.

@Sm0keySa1m0n
Created April 25, 2021 12:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sm0keySa1m0n/f3396a8aa3d5e093d24571ffd5da9c69 to your computer and use it in GitHub Desktop.
Save Sm0keySa1m0n/f3396a8aa3d5e093d24571ffd5da9c69 to your computer and use it in GitHub Desktop.
Demonstrates corruption bug in ScaleCube
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ThreadFactory;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.net.Address;
import io.scalecube.services.exceptions.MessageCodecException;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import io.scalecube.services.transport.rsocket.ConnectionSetup;
import io.scalecube.services.transport.rsocket.ConnectionSetupCodec;
import io.scalecube.services.transport.rsocket.DelegatedLoopResources;
import io.scalecube.services.transport.rsocket.RSocketClientTransportFactory;
import io.scalecube.services.transport.rsocket.RSocketServerTransportFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.resources.LoopResources;
public class Test {
private static final EventLoopGroup eventLoopGroup;
private static final LoopResources clientLoopResources;
private static final LoopResources serverLoopResources;
static {
eventLoopGroup = newEventLoopGroup();
clientLoopResources = DelegatedLoopResources.newClientLoopResources(eventLoopGroup);
serverLoopResources = DelegatedLoopResources.newServerLoopResources(eventLoopGroup);
}
private static final ConnectionSetupCodec connectionSetupCodec = new ConnectionSetupCodec() {
@Override
public void encode(OutputStream stream, ConnectionSetup value) throws IOException {
System.out.println("[" + Thread.currentThread().getName() + "] " + "Writing a 5");
stream.write(0x05);
stream.flush();
}
@SuppressWarnings("deprecation")
@Override
public ConnectionSetup decode(InputStream stream) throws IOException {
System.out.println("[" + Thread.currentThread().getName() + "] " + "Reading a " + stream.read());
return new ConnectionSetup();
}
};
private static Payload encodeConnectionSetup(ConnectionSetup connectionSetup) {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
try {
connectionSetupCodec.encode(new ByteBufOutputStream(byteBuf), connectionSetup);
} catch (Throwable ex) {
ReferenceCountUtil.safestRelease(byteBuf);
System.err.println(String.format("Failed to encode connectionSetup: %s, cause: %S", connectionSetup,
ex.toString()));
throw new MessageCodecException("Failed to encode ConnectionSetup", ex);
}
return ByteBufPayload.create(byteBuf);
}
// ========================================
// Copied directly from ScaleCube Services
// ========================================
private static EventLoopGroup newEventLoopGroup() {
ThreadFactory threadFactory = new DefaultThreadFactory("rsocket-worker", true);
EventLoopGroup eventLoopGroup = Epoll.isAvailable()
? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
: new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), threadFactory);
return LoopResources.colocate(eventLoopGroup);
}
private static ConnectionSetup decodeConnectionSetup(ByteBuf byteBuf) {
if (byteBuf.isReadable()) {
try (
ByteBufInputStream stream = new ByteBufInputStream(byteBuf, false /* releaseOnClose */)) {
return connectionSetupCodec.decode(stream);
} catch (Throwable ex) {
ReferenceCountUtil.safestRelease(byteBuf); // release byteBuf
throw new MessageCodecException("Failed to decode ConnectionSetup", ex);
}
}
return null;
}
// ========================================
public static void main(String[] args) throws IOException {
RSocketServer
.create((payload, socket) -> Mono.justOrEmpty(decodeConnectionSetup(payload.data()))
.then()
.cast(RSocket.class))
.bind(RSocketServerTransportFactory.websocket(25565)
.apply(serverLoopResources)
.serverTransport())
.subscribeOn(Schedulers.parallel())
.subscribe();
connect().flatMap(__ -> connect()).block();
}
private static Mono<RSocket> connect() {
return RSocketConnector.create()
.payloadDecoder(PayloadDecoder.DEFAULT)
.setupPayload(encodeConnectionSetup(new ConnectionSetup(Maps.newHashMap())))
.connect(() -> RSocketClientTransportFactory.websocket()
.apply(clientLoopResources)
.clientTransport(Address.create("localhost", 25565)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment