-
-
Save DSeeLP/b68f8616eb3f49143f4a0f4ba02148c8 to your computer and use it in GitHub Desktop.
ChannelPipeline, Decoders and Encoders
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BytePacketDecoder: MessageToMessageDecoder<ByteBuf>() { | |
override fun decode(ctx: ChannelHandlerContext, buffer: ByteBuf, out: MutableList<Any>) { | |
val packetId = buffer.readInt() | |
val useResponseCaching = buffer.readBoolean() | |
val requestIdBuffer = buffer.readBytes(16) | |
val requestId = UUID(ByteBufUtil.getBytes(requestIdBuffer)) | |
requestIdBuffer.release() | |
val bodyLength = buffer.readInt() | |
val body = buffer.readRetainedSlice(bodyLength) | |
val packet = RawPacket(packetId, useResponseCaching, requestId, null, body) | |
if (buffer.readableBytes() > 0) { | |
println("Buffer has readable bytes left! ${buffer.readableBytes()}") | |
} | |
buffer.discardReadBytes() | |
out.add(packet) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun initialize( | |
channel: Channel, | |
decoderDirection: PacketDirection, | |
encoderDirection: PacketDirection, | |
packetEncoder: PacketEncoder = PacketEncoder(encoderDirection), | |
responseHandler: ResponseHandler = ResponseHandler(encoderDirection), | |
packetHandler: PacketHandler = PacketHandler(), | |
packetMonitor: PacketMonitor = PacketMonitor() | |
) { | |
val scope = ChannelCoroutineScope() | |
channel.coroutineScopeAttribute.setIfAbsent(scope) | |
val pipeline = channel.pipeline() | |
//pipeline.addLast("flush-consolidation-handler", FlushConsolidationHandler()) | |
pipeline.addLast("first", ExceptionHandler) | |
pipeline.addLast("frameDecoder", LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)) | |
pipeline.addLast("byte-decoder", BytePacketDecoder()) | |
pipeline.addLast("decoder", PacketDecoder(decoderDirection)) | |
pipeline.addLast("prepender", LengthFieldPrepender(2)) | |
pipeline.addLast("encoder", packetEncoder) | |
pipeline.addLast("exception-handler", ExceptionHandler) | |
pipeline.addLast("coroutines", scope.ChannelHandler()) | |
pipeline.addBefore("decoder", "response-cache", responseHandler) | |
pipeline.addAfter("decoder", "packet-handler", packetHandler) | |
pipeline.addLast("monitor", packetMonitor) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Sharable | |
class PacketDecoder(val packetDirection: PacketDirection) : MessageToMessageDecoder<RawPacket>() { | |
override fun decode(ctx: ChannelHandlerContext, raw: RawPacket, out: MutableList<Any>) { | |
println("normal decoder hit") | |
raw.headers?.release() | |
val body = raw.body | |
val packets = ctx.protocol.getPackets(packetDirection) | |
val packet = packets.createPacket(raw.packetId, PacketMeta(raw.requestId), body) | |
body.release() | |
if (packet.handleError()) return | |
out.add(packet.orNull()!!) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//This file contains the Encoder | |
package utils.codec.encoding | |
import com.github.benmanes.caffeine.cache.Cache | |
import com.github.benmanes.caffeine.cache.Caffeine | |
import com.github.benmanes.caffeine.cache.RemovalListener | |
import io.netty.buffer.ByteBuf | |
import io.netty.channel.ChannelDuplexHandler | |
import io.netty.channel.ChannelHandler.Sharable | |
import io.netty.channel.ChannelHandlerContext | |
import io.netty.channel.ChannelInboundHandlerAdapter | |
import io.netty.handler.codec.MessageToByteEncoder | |
import io.netty.util.ReferenceCountUtil | |
import kotlinx.uuid.UUID | |
import utils.codec.decoding.RawPacket | |
import utils.protocol.PacketDirection | |
import utils.protocol.PacketResponse | |
import java.time.Duration | |
@Sharable | |
class ResponseHandler(val packetDirection: PacketDirection) : ChannelInboundHandlerAdapter() { | |
companion object { | |
const val MAX_CACHE_SIZE: Long = 200 | |
} | |
private val encoder = Encoder() | |
val responseCache: Cache<UUID, ByteBuf> = Caffeine.newBuilder() | |
.expireAfterWrite(Duration.ofMinutes(10)) | |
.maximumSize(MAX_CACHE_SIZE) | |
.evictionListener(RemovalListener<UUID, ByteBuf> { _, value, _ -> value?.let { ReferenceCountUtil.release(it) } }) | |
.build() | |
override fun handlerAdded(ctx: ChannelHandlerContext) { | |
ctx.pipeline().addAfter("encoder", "response-encoder", encoder) | |
} | |
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { | |
if (msg is RawPacket && msg.useResponseCaching) { | |
val byteBuf = responseCache.getIfPresent(msg.requestId) | |
println("Looking in cache: ${msg.requestId} Result: $byteBuf") | |
if (byteBuf == null) { | |
super.channelRead(ctx, msg) | |
return | |
} | |
println("Flushing cached packet") | |
ctx.write(byteBuf) | |
return | |
} | |
super.channelRead(ctx, msg) | |
} | |
@Sharable | |
//This is the Encoder | |
inner class Encoder : MessageToByteEncoder<PacketResponse>() { | |
override fun encode(ctx: ChannelHandlerContext, msg: PacketResponse, out: ByteBuf) { | |
val cached = responseCache.getIfPresent(msg.requestId) | |
if (cached != null) { | |
println("Written cached response") | |
out.writeBytes(cached) | |
return | |
} | |
println("Cached Packet: ${msg.requestId}") | |
PacketEncoder.encode(ctx, msg.response, out, packetDirection) | |
responseCache.put(msg.requestId, out.duplicate()) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment