Skip to content

Instantly share code, notes, and snippets.

@ponkotuy
Created December 27, 2014 13:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ponkotuy/344ca65eb411a64c7e96 to your computer and use it in GitHub Desktop.
Save ponkotuy/344ca65eb411a64c7e96 to your computer and use it in GitHub Desktop.
NettyProxySample
package com.ponkotuy.proxy
import java.nio.charset.StandardCharsets
import com.netaporter.uri.Uri
import io.netty.bootstrap.{Bootstrap, ServerBootstrap}
import io.netty.buffer.{ByteBuf, Unpooled, UnpooledByteBufAllocator}
import io.netty.channel._
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.{LogLevel, LoggingHandler}
/**
*
* @author ponkotuy
* Date: 14/12/27.
*/
object NettyProxy extends App {
val boss = new NioEventLoopGroup(1)
val worker = new NioEventLoopGroup()
try {
val b = new ServerBootstrap()
b.group(boss, worker)
.channel(classOf[NioServerSocketChannel])
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new NettyProxyInitializer("125.6.189.39", 80))
.childOption(ChannelOption.AUTO_READ, java.lang.Boolean.FALSE)
.bind(8080).sync().channel().closeFuture().sync()
} finally {
boss.shutdownGracefully()
worker.shutdownGracefully()
}
}
class NettyProxyInitializer(remoteHost: String, remotePort: Int) extends ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new NettyProxyFrontendHandler(remoteHost, remotePort)
)
}
}
class NettyProxyFrontendHandler(remoteHost: String, remotePort: Int) extends ChannelInboundHandlerAdapter {
import com.ponkotuy.proxy.NettyProxyFrontendHandler._
@volatile var outbound: Channel = null
override def channelActive(ctx: ChannelHandlerContext): Unit = {
val inbound = ctx.channel()
val b = new Bootstrap()
b.group(inbound.eventLoop())
.channel(ctx.channel().getClass)
.handler(new NettyProxyBackendHandler(inbound))
.option(ChannelOption.AUTO_READ, java.lang.Boolean.FALSE)
val f = b.connect(remoteHost, remotePort)
outbound = f.channel()
f.addListener(new ActiveListener(inbound))
}
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
if(outbound != null && outbound.isActive) {
msg match {
case buf: ByteBuf =>
val str = buf.toString(StandardCharsets.US_ASCII)
val Array(method, origUri, _) = str.lines.next().split(' ')
val result = if(method == "POST") {
val replaced = str.replace(origUri, Uri.parse(origUri).pathRaw)
val byteBuf = new UnpooledByteBufAllocator(false).buffer(replaced.size)
byteBuf.setBytes(0, replaced.toCharArray.map(_.toByte))
println(byteBuf.toString(StandardCharsets.US_ASCII))
byteBuf
} else {
buf
}
outbound.writeAndFlush(result).addListener(new ReadListener(ctx))
case _ =>
outbound.writeAndFlush(msg).addListener(new ReadListener(ctx))
}
}
}
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
if(outbound != null) closeOnFlush(outbound)
}
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
cause.printStackTrace()
closeOnFlush(ctx.channel())
}
private[this] class ActiveListener(inbound: Channel) extends ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if(future.isSuccess) inbound.read() else inbound.close()
}
}
}
object NettyProxyFrontendHandler {
def closeOnFlush(ch: Channel): Unit = {
if(ch.isActive) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE)
}
}
}
class NettyProxyBackendHandler(inboundChannel: Channel) extends ChannelInboundHandlerAdapter {
override def channelActive(ctx: ChannelHandlerContext): Unit = {
ctx.read()
ctx.write(Unpooled.EMPTY_BUFFER)
}
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
inboundChannel.writeAndFlush(msg).addListener(new ReadListener(ctx))
}
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
NettyProxyFrontendHandler.closeOnFlush(inboundChannel)
}
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
cause.printStackTrace()
NettyProxyFrontendHandler.closeOnFlush(ctx.channel())
}
}
private class ReadListener(ctx: ChannelHandlerContext) extends ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if(future.isSuccess) ctx.channel().read() else future.channel().close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment