Skip to content

Instantly share code, notes, and snippets.

@DSeeLP
Created March 29, 2022 08:27
Show Gist options
  • Save DSeeLP/b68f8616eb3f49143f4a0f4ba02148c8 to your computer and use it in GitHub Desktop.
Save DSeeLP/b68f8616eb3f49143f4a0f4ba02148c8 to your computer and use it in GitHub Desktop.
ChannelPipeline, Decoders and Encoders
class BytePacketDecoder: MessageToMessageDecoder<ByteBuf>() {
override fun decode(ctx: ChannelHandlerContext, buffer: ByteBuf, out: MutableList<Any>) {
val packetId = buffer.readInt()
val useResponseCaching = buffer.readBoolean()
val requestIdBuffer = buffer.readBytes(16)
val requestId = UUID(ByteBufUtil.getBytes(requestIdBuffer))
requestIdBuffer.release()
val bodyLength = buffer.readInt()
val body = buffer.readRetainedSlice(bodyLength)
val packet = RawPacket(packetId, useResponseCaching, requestId, null, body)
if (buffer.readableBytes() > 0) {
println("Buffer has readable bytes left! ${buffer.readableBytes()}")
}
buffer.discardReadBytes()
out.add(packet)
}
}
fun initialize(
channel: Channel,
decoderDirection: PacketDirection,
encoderDirection: PacketDirection,
packetEncoder: PacketEncoder = PacketEncoder(encoderDirection),
responseHandler: ResponseHandler = ResponseHandler(encoderDirection),
packetHandler: PacketHandler = PacketHandler(),
packetMonitor: PacketMonitor = PacketMonitor()
) {
val scope = ChannelCoroutineScope()
channel.coroutineScopeAttribute.setIfAbsent(scope)
val pipeline = channel.pipeline()
//pipeline.addLast("flush-consolidation-handler", FlushConsolidationHandler())
pipeline.addLast("first", ExceptionHandler)
pipeline.addLast("frameDecoder", LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2))
pipeline.addLast("byte-decoder", BytePacketDecoder())
pipeline.addLast("decoder", PacketDecoder(decoderDirection))
pipeline.addLast("prepender", LengthFieldPrepender(2))
pipeline.addLast("encoder", packetEncoder)
pipeline.addLast("exception-handler", ExceptionHandler)
pipeline.addLast("coroutines", scope.ChannelHandler())
pipeline.addBefore("decoder", "response-cache", responseHandler)
pipeline.addAfter("decoder", "packet-handler", packetHandler)
pipeline.addLast("monitor", packetMonitor)
}
@Sharable
class PacketDecoder(val packetDirection: PacketDirection) : MessageToMessageDecoder<RawPacket>() {
override fun decode(ctx: ChannelHandlerContext, raw: RawPacket, out: MutableList<Any>) {
println("normal decoder hit")
raw.headers?.release()
val body = raw.body
val packets = ctx.protocol.getPackets(packetDirection)
val packet = packets.createPacket(raw.packetId, PacketMeta(raw.requestId), body)
body.release()
if (packet.handleError()) return
out.add(packet.orNull()!!)
}
}
//This file contains the Encoder
package utils.codec.encoding
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalListener
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.MessageToByteEncoder
import io.netty.util.ReferenceCountUtil
import kotlinx.uuid.UUID
import utils.codec.decoding.RawPacket
import utils.protocol.PacketDirection
import utils.protocol.PacketResponse
import java.time.Duration
@Sharable
class ResponseHandler(val packetDirection: PacketDirection) : ChannelInboundHandlerAdapter() {
companion object {
const val MAX_CACHE_SIZE: Long = 200
}
private val encoder = Encoder()
val responseCache: Cache<UUID, ByteBuf> = Caffeine.newBuilder()
.expireAfterWrite(Duration.ofMinutes(10))
.maximumSize(MAX_CACHE_SIZE)
.evictionListener(RemovalListener<UUID, ByteBuf> { _, value, _ -> value?.let { ReferenceCountUtil.release(it) } })
.build()
override fun handlerAdded(ctx: ChannelHandlerContext) {
ctx.pipeline().addAfter("encoder", "response-encoder", encoder)
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
if (msg is RawPacket && msg.useResponseCaching) {
val byteBuf = responseCache.getIfPresent(msg.requestId)
println("Looking in cache: ${msg.requestId} Result: $byteBuf")
if (byteBuf == null) {
super.channelRead(ctx, msg)
return
}
println("Flushing cached packet")
ctx.write(byteBuf)
return
}
super.channelRead(ctx, msg)
}
@Sharable
//This is the Encoder
inner class Encoder : MessageToByteEncoder<PacketResponse>() {
override fun encode(ctx: ChannelHandlerContext, msg: PacketResponse, out: ByteBuf) {
val cached = responseCache.getIfPresent(msg.requestId)
if (cached != null) {
println("Written cached response")
out.writeBytes(cached)
return
}
println("Cached Packet: ${msg.requestId}")
PacketEncoder.encode(ctx, msg.response, out, packetDirection)
responseCache.put(msg.requestId, out.duplicate())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment