Skip to content

Instantly share code, notes, and snippets.

@adiroiban
Last active October 10, 2017 23:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adiroiban/559699241594637f11c9c94af9cb1c8e to your computer and use it in GitHub Desktop.
Save adiroiban/559699241594637f11c9c94af9cb1c8e to your computer and use it in GitHub Desktop.
Twisted HTTP Connect Proxy
"""
Based on the code from Scrapy
BSD-3-Clause
"""
from __future__ import absolute_import, unicode_literals
import re
from urlparse import urlparse
from twisted.internet import defer
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.ssl import ClientContextFactory
def proxify_http_connect_agent(agent, proxy):
"""
It transforms an HTTP agent to make requests via an HTTP proxy using the
CONNECT method as a tunnel.
Is initialized with the `proxy` configuration passed as URI.
"""
def get_endpoint_with_http_connect(uri):
"""
See: twisted.web.client.Agent.
"""
return _HTTPConnectClientEndpoint(
agent._reactor, uri,
proxyHost=proxy_host,
proxyPort=proxy_port,
)
url = urlparse(proxy)
if url.scheme != 'connect':
raise TypeError('Unsupported proxy type: %s' % (url.scheme,))
proxy_host = url.hostname
if not proxy_host:
raise ValueError('Proxy URL requires a host.')
proxy_port = url.port
if not proxy_port:
proxy_port = 8080
agent._getEndpoint = get_endpoint_with_http_connect
class _HTTPConnectClientEndpoint(TCP4ClientEndpoint):
"""
An endpoint that tunnels through HTTP/1.1 proxy to allow HTTPS downloads.
To accomplish that, this endpoint sends an HTTP CONNECT to the proxy.
CONNECT is sent for each new connection.
If a connection is reused in a pool, CONNECT is not re-sent as no new
connection is made.
When CONNECT fails the transport is closed to prevent keeping it in the
pool and being reused without a previous successful CONNECT.
See: https://tools.ietf.org/html/rfc2817#section-5.2
"""
_responseMatcher = re.compile(b'HTTP/1\.. 200')
def __init__(self, reactor, uri, proxyHost, proxyPort):
super(_HTTPConnectClientEndpoint, self).__init__(
reactor, proxyHost, proxyPort)
# Called when the request was made or failed via the HTTP proxy.
self._tunnelReadyDeferred = defer.Deferred()
# URI of the original request..
self._uri = uri
# Factory used for the tunneled connection.
self._protocolFactory = None
# The protocol which should be tunneled.
self._protocol = None
# FIXME:
# Add here your own context factory
# SSL context factory used by the tunneled connection.
self._contextFactory = ClientContextFactory()
def _cbProxyConnect(self, protocol):
"""
Called when we get a new connection to the proxy.
"""
# Send the HTTP CONNECT to initiate the tunnel.
tunnelCommand = (
b'CONNECT %(host)s:%(port)s HTTP/1.1\r\n'
b'User-Agent: Twisted Proxy\r\n'
b'Proxy-Connection: keep-alive\r\n'
b'Connection: keep-alive\r\n'
b'Host: %(host)s:%(port)s\r\n'
b'\r\n'
) % {'host': self._uri.host, 'port': self._uri.port}
protocol.transport.write(tunnelCommand)
# Intercept the response,
self._protocolDataReceived = protocol.dataReceived
protocol.dataReceived = self._gotHTTPConnectResponse
self._protocol = protocol
return protocol
def _gotHTTPConnectResponse(self, data):
"""
Called when we receive the response for the HTTP CONNECT command.
"""
# Switch back the original protocol data received handling as we no
# longer need to intercept the data.
self._protocol.dataReceived = self._protocolDataReceived
if self._responseMatcher.match(data):
self._protocol.transport.startTLS(
self._contextFactory, self._protocolFactory)
self._tunnelReadyDeferred.callback(self._protocol)
else:
# We close the connection to make sure it is not kept in the
# pool.
self._protocol.transport.loseConnection()
# Inform
self._tunnelReadyDeferred.errback(ServerException(
u'Could not open the HTTP CONNECT tunnel. %r' % (data[:50],)))
def connect(self, protocolFactory):
"""
Connect to the remote host.
"""
self._protocolFactory = protocolFactory
deferred = super(_HTTPConnectClientEndpoint, self).connect(
protocolFactory)
deferred.addCallback(self._cbProxyConnect)
deferred.addErrback(
lambda failure: self._tunnelReadyDeferred.errback(failure))
return self._tunnelReadyDeferred
"""
Tests for HTTP proxy.
"""
from __future__ import absolute_import, unicode_literals
from twisted.internet.error import (
ConnectionRefusedError as TwistedConnectionRefusedError,
)
from chevah.server.commons.exception import ServerException
from chevah.server.http.client import PersistentAgent
from chevah.server.http.proxy import proxify_http_connect_agent
from chevah.server.testing import (
attr,
HTTPProxyContext,
HTTPServerContext,
mk,
ResponseDefinition,
ServerTestCase,
)
from chevah.server.testing.constants import SSL_DATA
class TestProxifyHTTPConnectAgent(ServerTestCase):
"""
Unit and system tests for proxify_http_connect_agent.
"""
def makeResponse(
self, request=None, code=200, body='', phrase=None, length=None,
method='POST'
):
"""
Create a ResponseDefinition done for request.
"""
response = ResponseDefinition(
method=method,
url='/',
request=request,
response_code=code,
response_message=phrase,
response_content=body,
response_length=length,
persistent=True,
)
return response
def test_default(self):
"""
Will set a default port, when not specified in the configuration.
"""
sut = PersistentAgent()
proxify_http_connect_agent(
sut, proxy=u'connect://12.23.12.21')
result = sut._getEndpoint(uri=object())
self.assertEqual(8080, result._port)
def test_unknown_proxy(self):
"""
Raise an exception when initialized with an unknown proxy type.
"""
sut = PersistentAgent()
exception = self.assertRaises(
ServerException,
proxify_http_connect_agent, sut, u'http://some.com'
)
self.assertEqual(u'Unsupported proxy type: http', exception.message)
def test_proxy_no_host(self):
"""
Raise an exception when initialized without a host.
"""
sut = PersistentAgent()
exception = self.assertRaises(
ServerException,
proxify_http_connect_agent, sut, u'connect://'
)
self.assertEqual(u'Proxy URL requires a host.', exception.message)
def test_bad_proxy_address(self):
"""
Will raise an exception when the proxy is not found.
"""
sut = PersistentAgent()
body = mk.ascii()
cert = SSL_DATA['SERVER_SELF_CERT_AND_KEY_PATH']
response = self.makeResponse(request=body)
proxify_http_connect_agent(
sut, proxy=u'connect://127.0.0.1:5478')
# We still listen on a real port to make sure the proxy is not
# bypassed.
with HTTPServerContext([response], cert=cert) as server:
url = 'https://%s:%s' % (server.ip, server.port)
deferred = sut.post(url=url, body=body, headers={})
failure = self.getDeferredFailure(deferred)
self.assertIsInstance(TwistedConnectionRefusedError, failure.value)
def test_proxy_CONNECT_not_accepted(self):
"""
Will raise an exception when the proxy is found but does not
accept the CONENCT request.
"""
sut = PersistentAgent()
reason = b"Not accepted ('CONNECT')"
with HTTPProxyContext(reject_reason=reason) as proxy:
proxify_http_connect_agent(
sut,
proxy=u'connect://%s:%s' % (proxy.ip, proxy.port),
)
# We still listen on a real port to make sure the proxy is not
# bypassed.
with HTTPServerContext([]) as server:
url = 'http://%s:%s' % (server.ip, server.port)
deferred = sut.post(url=url, body=mk.ascii(), headers={})
failure = self.getDeferredFailure(deferred, prevent_stop=True)
self.executeReactor()
self.assertIsInstance(ServerException, failure.value)
self.assertEqual(
u'Could not open the HTTP CONNECT tunnel. '
u'"HTTP/1.1 501 Not accepted (\'CONNECT\')\\r\\n"',
failure.value.message,
)
def test_proxy_CONNECT_TLS_failure(self):
"""
Once the CONNECT is successful it will trigger the TLS handshake and
if it fails, it will not cache the connection.
"""
sut = PersistentAgent()
body = mk.ascii()
response = self.makeResponse(request=body)
with HTTPProxyContext() as proxy:
proxify_http_connect_agent(
sut,
proxy=u'connect://%s:%s' % (proxy.ip, proxy.port),
)
# We are using an HTTP only server which should fail during the
# TLS handshake.
with HTTPServerContext([response]) as server:
url = 'http://%s:%s' % (server.ip, server.port)
deferred = sut.post(url=url, body=body, headers={})
failure = self.getDeferredFailure(deferred, prevent_stop=True)
self.executeReactor()
# We have a non-HTTPS server, so the connection fails.
self.assertIsInstance(ServerException, failure.value)
if self.os_name == 'aix': # noqa:cover
# FIXME:4172:
# We don't have coverage reporting on all systems.
# On AIX we don't get the error details.
self.assertContains(u"[]", failure.value.message)
elif (
self.os_name == 'windows' or
self.os_version in ['arch', 'osx-10.8']
):
self.assertContains(
'wrong version number', failure.value.message)
else:
self.assertContains(
u"'SSL23_GET_SERVER_HELLO', 'unknown protocol'",
failure.value.message,
)
@attr('slow')
def test_proxy_CONNECT_TLS_ok(self):
"""
Once the CONNECT is successful it will trigger the TLS handshake and
return the content of the page and keep the connection active.
"""
cert = SSL_DATA['SERVER_SELF_CERT_AND_KEY_PATH']
body = mk.ascii()
response = self.makeResponse(method='GET', body=body)
sut = PersistentAgent()
with HTTPProxyContext() as proxy:
proxify_http_connect_agent(
sut,
proxy=u'connect://%s:%s' % (proxy.ip, proxy.port),
)
with HTTPServerContext([response], cert=cert) as https:
url = u'https://%s:%s/' % (https.ip, https.port)
deferred = sut.get(url=url, headers={})
response = self.getDeferredResult(
deferred,
timeout=self.DEFERRED_TIMEOUT + 4,
prevent_stop=True,
)
result = self.getDeferredResult(
sut.readBody(response),
prevent_stop=True,
)
self.assertEqual(body, result)
# Connection is kept alive.
self.assertIsNotEmpty(sut._pool._connections)
deferred = sut.closePersistentConnections()
self.getDeferredResult(deferred, timeout=self.DEFERRED_TIMEOUT + 4)
self.assertIsEmpty(sut._pool._connections)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment