Created
December 27, 2014 13:04
-
-
Save ponkotuy/344ca65eb411a64c7e96 to your computer and use it in GitHub Desktop.
NettyProxySample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ponkotuy.proxy | |
import java.nio.charset.StandardCharsets | |
import com.netaporter.uri.Uri | |
import io.netty.bootstrap.{Bootstrap, ServerBootstrap} | |
import io.netty.buffer.{ByteBuf, Unpooled, UnpooledByteBufAllocator} | |
import io.netty.channel._ | |
import io.netty.channel.nio.NioEventLoopGroup | |
import io.netty.channel.socket.SocketChannel | |
import io.netty.channel.socket.nio.NioServerSocketChannel | |
import io.netty.handler.logging.{LogLevel, LoggingHandler} | |
/** | |
* | |
* @author ponkotuy | |
* Date: 14/12/27. | |
*/ | |
object NettyProxy extends App { | |
val boss = new NioEventLoopGroup(1) | |
val worker = new NioEventLoopGroup() | |
try { | |
val b = new ServerBootstrap() | |
b.group(boss, worker) | |
.channel(classOf[NioServerSocketChannel]) | |
.handler(new LoggingHandler(LogLevel.INFO)) | |
.childHandler(new NettyProxyInitializer("125.6.189.39", 80)) | |
.childOption(ChannelOption.AUTO_READ, java.lang.Boolean.FALSE) | |
.bind(8080).sync().channel().closeFuture().sync() | |
} finally { | |
boss.shutdownGracefully() | |
worker.shutdownGracefully() | |
} | |
} | |
class NettyProxyInitializer(remoteHost: String, remotePort: Int) extends ChannelInitializer[SocketChannel] { | |
override def initChannel(ch: SocketChannel): Unit = { | |
ch.pipeline().addLast( | |
new LoggingHandler(LogLevel.INFO), | |
new NettyProxyFrontendHandler(remoteHost, remotePort) | |
) | |
} | |
} | |
class NettyProxyFrontendHandler(remoteHost: String, remotePort: Int) extends ChannelInboundHandlerAdapter { | |
import com.ponkotuy.proxy.NettyProxyFrontendHandler._ | |
@volatile var outbound: Channel = null | |
override def channelActive(ctx: ChannelHandlerContext): Unit = { | |
val inbound = ctx.channel() | |
val b = new Bootstrap() | |
b.group(inbound.eventLoop()) | |
.channel(ctx.channel().getClass) | |
.handler(new NettyProxyBackendHandler(inbound)) | |
.option(ChannelOption.AUTO_READ, java.lang.Boolean.FALSE) | |
val f = b.connect(remoteHost, remotePort) | |
outbound = f.channel() | |
f.addListener(new ActiveListener(inbound)) | |
} | |
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = { | |
if(outbound != null && outbound.isActive) { | |
msg match { | |
case buf: ByteBuf => | |
val str = buf.toString(StandardCharsets.US_ASCII) | |
val Array(method, origUri, _) = str.lines.next().split(' ') | |
val result = if(method == "POST") { | |
val replaced = str.replace(origUri, Uri.parse(origUri).pathRaw) | |
val byteBuf = new UnpooledByteBufAllocator(false).buffer(replaced.size) | |
byteBuf.setBytes(0, replaced.toCharArray.map(_.toByte)) | |
println(byteBuf.toString(StandardCharsets.US_ASCII)) | |
byteBuf | |
} else { | |
buf | |
} | |
outbound.writeAndFlush(result).addListener(new ReadListener(ctx)) | |
case _ => | |
outbound.writeAndFlush(msg).addListener(new ReadListener(ctx)) | |
} | |
} | |
} | |
override def channelInactive(ctx: ChannelHandlerContext): Unit = { | |
if(outbound != null) closeOnFlush(outbound) | |
} | |
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { | |
cause.printStackTrace() | |
closeOnFlush(ctx.channel()) | |
} | |
private[this] class ActiveListener(inbound: Channel) extends ChannelFutureListener { | |
override def operationComplete(future: ChannelFuture): Unit = { | |
if(future.isSuccess) inbound.read() else inbound.close() | |
} | |
} | |
} | |
object NettyProxyFrontendHandler { | |
def closeOnFlush(ch: Channel): Unit = { | |
if(ch.isActive) { | |
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE) | |
} | |
} | |
} | |
class NettyProxyBackendHandler(inboundChannel: Channel) extends ChannelInboundHandlerAdapter { | |
override def channelActive(ctx: ChannelHandlerContext): Unit = { | |
ctx.read() | |
ctx.write(Unpooled.EMPTY_BUFFER) | |
} | |
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = { | |
inboundChannel.writeAndFlush(msg).addListener(new ReadListener(ctx)) | |
} | |
override def channelInactive(ctx: ChannelHandlerContext): Unit = { | |
NettyProxyFrontendHandler.closeOnFlush(inboundChannel) | |
} | |
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { | |
cause.printStackTrace() | |
NettyProxyFrontendHandler.closeOnFlush(ctx.channel()) | |
} | |
} | |
private class ReadListener(ctx: ChannelHandlerContext) extends ChannelFutureListener { | |
override def operationComplete(future: ChannelFuture): Unit = { | |
if(future.isSuccess) ctx.channel().read() else future.channel().close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment