Skip to content

Instantly share code, notes, and snippets.

@brad-anton
Last active November 27, 2018 16:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save brad-anton/8efc396d867af28a1914f3fb918ea202 to your computer and use it in GitHub Desktop.
Save brad-anton/8efc396d867af28a1914f3fb918ea202 to your computer and use it in GitHub Desktop.
This is a creative way to force python requests' module to issue a Proxy CONNECT over HTTPS. Also allows you to define proxy headers and other goodies
"""
requests_connect_over_https.py
@brad_anton
This is a creative way to force python requests' module to issue a Proxy
CONNECT over HTTPS. Also allows you to define proxy headers and other goodies
Warning: This was only tested partially
"""
from requests import Session
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
from requests.packages.urllib3 import PoolManager, proxy_from_url, ProxyManager
from requests.packages.urllib3.connection import VerifiedHTTPSConnection, RECENT_DATE
from requests.packages.urllib3.util.ssl_ import create_urllib3_context, resolve_ssl_version, resolve_cert_reqs, ssl_wrap_socket
from ssl import PROTOCOL_TLSv1
import datetime
import logging
try:
import http.client as http_client
from http.HTTPStatus import OK
except ImportError:
# Python 2
import httplib as http_client
from httplib import OK
http_client.HTTPConnection.debuglevel = 1
class ConnectOverHTTPS(VerifiedHTTPSConnection):
def _tunnel(self):
connect_str = "CONNECT %s:%d HTTP/1.0\r\n" % (self._tunnel_host,
self._tunnel_port)
connect_bytes = connect_str.encode("ascii")
self.send(connect_bytes)
for header, value in self._tunnel_headers.items():
header_str = "%s: %s\r\n" % (header, value)
header_bytes = header_str.encode("latin-1")
self.send(header_bytes)
self.send(b'\r\n')
response = self.response_class(self.sock, method=self._method)
(version, code, message) = response._read_status()
if code != OK:
self.close()
raise OSError("Tunnel connection failed: %d %s" % (code,
message.strip()))
while True:
line = response.fp.readline(_MAXLINE + 1)
if len(line) > _MAXLINE:
raise LineTooLong("header line")
if not line:
# for sites which EOF without sending a trailer
break
if line in (b'\r\n', b'\n', b''):
break
if self.debuglevel > 0:
print('header:', line.decode())
def _do_connect(self, conn):
if self._tunnel_host:
self.sock = conn
# Calls self._set_hostport(), so self.host is
# self._tunnel_host below.
self._tunnel()
# Mark this connection as not reusable
#self.auto_open = 0
# Override the host with the one we're requesting data from.
hostname = self._tunnel_host
server_hostname = hostname
if self.server_hostname is not None:
server_hostname = self.server_hostname
return server_hostname
def connect(self):
# Add certificate verification
conn = self._new_conn()
hostname = self.host
# Moved original code from
# https://github.com/urllib3/urllib3/blob/master/src/urllib3/connection.py#L304-L317
# to _do_connect()
server_hostname = hostname # hope this works!
is_time_off = datetime.date.today() < RECENT_DATE
if is_time_off:
warnings.warn((
'System time is way off (before {0}). This will probably '
'lead to SSL verification errors').format(RECENT_DATE),
SystemTimeWarning
)
# Wrap socket using verification with the root certs in
# trusted_root_certs
if not hasattr(self, 'ssl_context'):
self.ssl_context = None
if self.ssl_context is None:
self.ssl_context = create_urllib3_context(
ssl_version=resolve_ssl_version(self.ssl_version),
cert_reqs=resolve_cert_reqs(self.cert_reqs),
)
context = self.ssl_context
context.verify_mode = resolve_cert_reqs(self.cert_reqs)
self.sock = ssl_wrap_socket(
sock=conn,
keyfile=self.key_file,
certfile=self.cert_file,
ca_certs=self.ca_certs,
ca_cert_dir=self.ca_cert_dir,
server_hostname=server_hostname,
ssl_context=context)
# Important to move the _tunnel() call after socket established
server_hostname = self._do_connect(self.sock)
self.is_verified = True # total hack to reduce complexity of the below blcok
"""
if self.assert_fingerprint:
assert_fingerprint(self.sock.getpeercert(binary_form=True),
self.assert_fingerprint)
elif context.verify_mode != ssl.CERT_NONE \
and not getattr(context, 'check_hostname', False) \
and self.assert_hostname is not False:
# While urllib3 attempts to always turn off hostname matching from
# the TLS library, this cannot always be done. So we check whether
# the TLS Library still thinks it's matching hostnames.
cert = self.sock.getpeercert()
if not cert.get('subjectAltName', ()):
warnings.warn((
'Certificate for {0} has no `subjectAltName`, falling back to check for a '
'`commonName` for now. This feature is being removed by major browsers and '
'deprecated by RFC 2818. (See https://github.com/shazow/urllib3/issues/497 '
'for details.)'.format(hostname)),
SubjectAltNameWarning
)
_match_hostname(cert, self.assert_hostname or server_hostname)
self.is_verified = (
context.verify_mode == ssl.CERT_REQUIRED or
self.assert_fingerprint is not None
)
"""
class ConnectOverHTTPSAdapter(HTTPAdapter):
# Can also get access to proxy headers here.
def proxy_manager_for(self, proxy, **proxy_kwargs):
manager = super(ConnectOverHTTPSAdapter, self).proxy_manager_for(proxy, **proxy_kwargs)
# Need to override the ConnectionCls with our Subclassed one to get at _tunnel()
manager.pool_classes_by_scheme['https'].ConnectionCls = ConnectOverHTTPS
return manager
def get(url, proxies):
with Session() as s:
s.mount('https://', ConnectOverHTTPSAdapter())
s.proxies = proxies
response = s.get(url)
return response.status_code
if __name__ == '__main__':
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
proxies = { 'https': 'https://some_proxy:443' }
url = 'https://www.google.com'
print(url, get(url, proxies))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment