-
-
Save Sm0keySa1m0n/f3396a8aa3d5e093d24571ffd5da9c69 to your computer and use it in GitHub Desktop.
Demonstrates corruption bug in ScaleCube
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.util.concurrent.ThreadFactory; | |
import com.google.common.collect.Maps; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.ByteBufAllocator; | |
import io.netty.buffer.ByteBufInputStream; | |
import io.netty.buffer.ByteBufOutputStream; | |
import io.netty.channel.EventLoopGroup; | |
import io.netty.channel.epoll.Epoll; | |
import io.netty.channel.epoll.EpollEventLoopGroup; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.util.concurrent.DefaultThreadFactory; | |
import io.rsocket.Payload; | |
import io.rsocket.RSocket; | |
import io.rsocket.core.RSocketConnector; | |
import io.rsocket.core.RSocketServer; | |
import io.rsocket.frame.decoder.PayloadDecoder; | |
import io.rsocket.util.ByteBufPayload; | |
import io.scalecube.net.Address; | |
import io.scalecube.services.exceptions.MessageCodecException; | |
import io.scalecube.services.transport.api.ReferenceCountUtil; | |
import io.scalecube.services.transport.rsocket.ConnectionSetup; | |
import io.scalecube.services.transport.rsocket.ConnectionSetupCodec; | |
import io.scalecube.services.transport.rsocket.DelegatedLoopResources; | |
import io.scalecube.services.transport.rsocket.RSocketClientTransportFactory; | |
import io.scalecube.services.transport.rsocket.RSocketServerTransportFactory; | |
import reactor.core.publisher.Mono; | |
import reactor.core.scheduler.Schedulers; | |
import reactor.netty.resources.LoopResources; | |
public class Test { | |
private static final EventLoopGroup eventLoopGroup; | |
private static final LoopResources clientLoopResources; | |
private static final LoopResources serverLoopResources; | |
static { | |
eventLoopGroup = newEventLoopGroup(); | |
clientLoopResources = DelegatedLoopResources.newClientLoopResources(eventLoopGroup); | |
serverLoopResources = DelegatedLoopResources.newServerLoopResources(eventLoopGroup); | |
} | |
private static final ConnectionSetupCodec connectionSetupCodec = new ConnectionSetupCodec() { | |
@Override | |
public void encode(OutputStream stream, ConnectionSetup value) throws IOException { | |
System.out.println("[" + Thread.currentThread().getName() + "] " + "Writing a 5"); | |
stream.write(0x05); | |
stream.flush(); | |
} | |
@SuppressWarnings("deprecation") | |
@Override | |
public ConnectionSetup decode(InputStream stream) throws IOException { | |
System.out.println("[" + Thread.currentThread().getName() + "] " + "Reading a " + stream.read()); | |
return new ConnectionSetup(); | |
} | |
}; | |
private static Payload encodeConnectionSetup(ConnectionSetup connectionSetup) { | |
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); | |
try { | |
connectionSetupCodec.encode(new ByteBufOutputStream(byteBuf), connectionSetup); | |
} catch (Throwable ex) { | |
ReferenceCountUtil.safestRelease(byteBuf); | |
System.err.println(String.format("Failed to encode connectionSetup: %s, cause: %S", connectionSetup, | |
ex.toString())); | |
throw new MessageCodecException("Failed to encode ConnectionSetup", ex); | |
} | |
return ByteBufPayload.create(byteBuf); | |
} | |
// ======================================== | |
// Copied directly from ScaleCube Services | |
// ======================================== | |
private static EventLoopGroup newEventLoopGroup() { | |
ThreadFactory threadFactory = new DefaultThreadFactory("rsocket-worker", true); | |
EventLoopGroup eventLoopGroup = Epoll.isAvailable() | |
? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors(), threadFactory) | |
: new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), threadFactory); | |
return LoopResources.colocate(eventLoopGroup); | |
} | |
private static ConnectionSetup decodeConnectionSetup(ByteBuf byteBuf) { | |
if (byteBuf.isReadable()) { | |
try ( | |
ByteBufInputStream stream = new ByteBufInputStream(byteBuf, false /* releaseOnClose */)) { | |
return connectionSetupCodec.decode(stream); | |
} catch (Throwable ex) { | |
ReferenceCountUtil.safestRelease(byteBuf); // release byteBuf | |
throw new MessageCodecException("Failed to decode ConnectionSetup", ex); | |
} | |
} | |
return null; | |
} | |
// ======================================== | |
public static void main(String[] args) throws IOException { | |
RSocketServer | |
.create((payload, socket) -> Mono.justOrEmpty(decodeConnectionSetup(payload.data())) | |
.then() | |
.cast(RSocket.class)) | |
.bind(RSocketServerTransportFactory.websocket(25565) | |
.apply(serverLoopResources) | |
.serverTransport()) | |
.subscribeOn(Schedulers.parallel()) | |
.subscribe(); | |
connect().flatMap(__ -> connect()).block(); | |
} | |
private static Mono<RSocket> connect() { | |
return RSocketConnector.create() | |
.payloadDecoder(PayloadDecoder.DEFAULT) | |
.setupPayload(encodeConnectionSetup(new ConnectionSetup(Maps.newHashMap()))) | |
.connect(() -> RSocketClientTransportFactory.websocket() | |
.apply(clientLoopResources) | |
.clientTransport(Address.create("localhost", 25565))); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment