Skip to content

Instantly share code, notes, and snippets.

@spockz
Last active July 15, 2016 19:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spockz/ebf0ccb47c912a21b924efc45d120859 to your computer and use it in GitHub Desktop.
Save spockz/ebf0ccb47c912a21b924efc45d120859 to your computer and use it in GitHub Desktop.
Netty3 version of HttpProxyConnectHandler
package com.twitter.finagle.netty3.proxy
import com.twitter.finagle._
import com.twitter.finagle.client.Transporter
import com.twitter.finagle.client.Transporter.Credentials
import com.twitter.io.Charsets
import com.twitter.util.Base64StringEncoder
import java.net.{InetSocketAddress, SocketAddress}
import java.util.concurrent.atomic.AtomicReference
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.handler.queue.BufferedWriteHandler
/**
* An internal handler that upgrades the pipeline to delay connect-promise satisfaction until the
* remote HTTP proxy server is ready to proxy traffic to an ultimate destination represented as
* `host` (i.e., HTTP proxy connect procedure is successful).
*
* This enables "Tunneling TCP-based protocols (i.e., TLS/SSL) through Web proxy servers" [1] and
* may be used with any TCP traffic, not only HTTP(S). See Squid documentation on this feature [2].
*
* @note We don't use Netty's implementation [3] here because it supports an opposite direction: the
* destination passed to `Channel.connect` is an ultimate target and the `HttpProxyHandler`
* is supposed to replace it with proxy addr (represented as a `SocketAddress`). This is the
* exact approach we used for Netty 3 implementation, but we don't do that anymore because we
* don't want to bypass Finagle's load balancers while resolving the proxy endpoint.
* @note This mixes in a [[BufferingChannelOutboundHandler]] so we can protect ourselves from
* channel handlers that write on `channelAdded` or `channelActive`.
*
* [1]: http://www.web-cache.com/Writings/Internet-Drafts/draft-luotonen-web-proxy-tunneling-01.txt
* [2]: http://wiki.squid-cache.org/Features/HTTPS
* [3]: https://github.com/netty/netty/blob/4.1/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java
* @param host the ultimate host a remote proxy server connects to
* @param credentialsOption optional credentials for a proxy server
*/
private[netty3] class HttpProxyConnectHandler(
host: String,
credentialsOption: Option[Transporter.Credentials],
httpClientCodec: ChannelHandler = new HttpClientCodec()) // exposed for testing
extends BufferedWriteHandler
// with ConnectPromiseDelayListeners
{ self =>
private[this] val httpCodecKey: String = "http proxy client codec"
// private[this] var connectPromise: ChannelPromise = _
private[this] val connectFuture = new AtomicReference[ChannelFuture](null)
private[this] def proxyAuthorizationHeader(c: Credentials): String = {
val bytes = "%s:%s".format(c.username, c.password).getBytes(Charsets.Utf8)
"Basic " + Base64StringEncoder.encode(bytes)
}
private[this] def fail(ctx: ChannelHandlerContext, t: SocketAddress => Throwable): Unit = {
// We "try" because it might be already cancelled and we don't need to handle
// cancellations here - it's already done by `proxyCancellationsTo`.
connectFuture.get().setFailure(t(ctx.getChannel.getRemoteAddress))
}
private[this] def fail(c: Channel, t: SocketAddress => Throwable) {
Option(connectFuture.get) foreach { _.setFailure(t(c.getRemoteAddress)) }
Channels.close(c)
}
override def connectRequested(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
println("entering connectRequested")
e match {
case de: DownstreamChannelStateEvent =>
if (!connectFuture.compareAndSet(null, e.getFuture)) {
fail(ctx.getChannel, (remoteAddress : SocketAddress) => new InconsistentStateException(remoteAddress))
return
}
// proxy cancellation
val wrappedConnectFuture = Channels.future(de.getChannel, true)
de.getFuture.addListener(new ChannelFutureListener {
def operationComplete(f: ChannelFuture) {
if (f.isCancelled)
wrappedConnectFuture.cancel()
}
})
// Proxy failures here so that if the connect fails, it is
// propagated to the listener, not just on the channel.
wrappedConnectFuture.addListener(new ChannelFutureListener {
def operationComplete(f: ChannelFuture) {
if (f.isSuccess || f.isCancelled) {
return
}
fail(f.getChannel, (_: SocketAddress) => f.getCause)
}
})
// We propagate the pipeline with a new promise thereby delaying the original connect's
// satisfaction.
// TODO: Figure out why the remoteAddress is
val wrappedEvent = new DownstreamChannelStateEvent(
de.getChannel, wrappedConnectFuture,
de.getState, de.getValue)
println(s"Connecting to ${wrappedEvent.getChannel.getRemoteAddress} with $wrappedEvent")
super.connectRequested(ctx, wrappedEvent)
}
}
override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
println("channelConnected")
if (connectFuture.get eq null) {
fail(ctx, (remoteAddress: SocketAddress) => new InconsistentStateException(remoteAddress))
return
}
// Add HTTP client codec so we can talk to an HTTP proxy.
ctx.getPipeline().addBefore(ctx.getName(), httpCodecKey, httpClientCodec)
val httpProxyConnectRequestFuture = Channels.future(e.getChannel, true)
// Create new connect HTTP proxy connect request.
val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.CONNECT, host)
req.headers().set(HttpHeaders.Names.HOST, host)
credentialsOption.foreach(c =>
req.headers().add(HttpHeaders.Names.PROXY_AUTHORIZATION, proxyAuthorizationHeader(c))
)
println(s"Sending Connect message downstream [$req]")
ctx.sendDownstream(new DownstreamMessageEvent(ctx.getChannel, httpProxyConnectRequestFuture, req, ctx.getChannel.getRemoteAddress))
// httpProxyConnectRequestFuture.addListener(???)
// proxy cancellations again.
connectFuture.get.addListener(new ChannelFutureListener {
override def operationComplete(f: ChannelFuture): Unit =
if (f.isCancelled) {
fail(ctx.getChannel, (remoteAddress : SocketAddress) =>new ChannelClosedException(remoteAddress))
}
})
//perhaps read is necessary
}
override def messageReceived(ctx: ChannelHandlerContext, messageEvent: MessageEvent): Unit = {
println(s"Received Message: $messageEvent")
messageEvent.getMessage match {
case rep: DefaultHttpResponse =>
println(s"Received response: ${rep.toString}")
// A remote HTTP proxy is ready to proxy traffic to an ultimate destination. We no longer
// need HTTP proxy pieces in the pipeline.
if (rep.getStatus == HttpResponseStatus.OK) {
ctx.getPipeline.remove(httpCodecKey)
ctx.getPipeline.remove(self) // drains pending writes when removed
connectFuture.get.setSuccess()
// We don't release `req` since by specs, we don't expect any payload sent back from a
// a web proxy server.
} else {
val failure = (remote: SocketAddress) => new ConnectionFailedException(
Failure(s"Unexpected status returned from an HTTP proxy server: ${rep.getStatus()}."),
remote
)
fail(ctx, failure)
ctx.getChannel.close()
}
case _ => ctx.sendUpstream(messageEvent)
// case other => ctx.fireChannelRead(other)
}
}
//
// override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
// fail(ctx, (_:SocketAddress) => cause)
//// TODO: What to do with: ctx.fireExceptionCaught(cause) // we don't call super.exceptionCaught since we've already filed
// // both connect promise and pending writes in `fail`
//
//// TODO: What to do with: ctx.close() // close a channel since we've failed to perform an HTTP proxy handshake
// }
// override def channelInactive(ctx: ChannelHandlerContext): Unit = {
// fail(ctx, new ChannelClosedException(ctx.getChannel().getRemoteAddress()))
// ctx.fireChannelInactive()
// }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment