Skip to content

Instantly share code, notes, and snippets.

@Tey
Last active October 7, 2023 04:31
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Tey/ded2486da8a69bb84d2dce94e2e09ff7 to your computer and use it in GitHub Desktop.
Save Tey/ded2486da8a69bb84d2dce94e2e09ff7 to your computer and use it in GitHub Desktop.
HTTP proxy digest authentication for requests lib which works with HTTPS websites
#!/usr/bin/env python
import sys
import re
if sys.version_info[0] == 2:
import httplib
import urlparse
else:
import http.client as httplib
import urllib.parse as urlparse
import requests
from requests.packages import urllib3
from requests.compat import urlparse as _urlparse
def dbg_print(fmt, *args):
if False:
sys.stderr.write(fmt % args)
sys.stderr.write("\n")
# HTTP Digest authentication for proxy
# The main differences with HTTPDigestAuth are:
# - Proxy HTTP status is 407 when authentication is required (instead of 401)
# - The related request/response headers are Proxy-Authorization/Proxy-Authenticate (instead of
# Authorization/WWW-Authenticate)
# - When proxying HTTPS, the method is CONNECT and the uri is the address:port of the webserver
# - A webserver authentication instance can be associated with HTTPDigestAuth so that proxy
# authentication does not prevent webserver authentication
class HTTPProxyDigestAuth(requests.auth.HTTPDigestAuth):
# self._thread_local.num_401_calls should be self._thread_local.num_407_calls
# handle_401() should be handle_407()
# We keep the original names to reduce the changes count
def __init__(self, username, password, auth=None):
super(HTTPProxyDigestAuth, self).__init__(username, password)
# AuthBase instance for website (not proxy)
self.auth = auth
def build_digest_header(self, method, url):
# For HTTPS, replace uri and method
url_parsed = urlparse.urlparse(url)
if url_parsed.scheme.lower() == 'https':
# URI is the CONNECT path
if url_parsed.port is None:
url = url_parsed.netloc + ':443'
else:
url = url_parsed.netloc
method = 'CONNECT'
return super(HTTPProxyDigestAuth, self).build_digest_header(method, url)
def handle_401(self, r, **kwargs):
"""
Takes the given response and tries digest-auth, if needed.
:rtype: requests.Response
"""
# If response is not 407, do not auth
if r.status_code != 407:
self._thread_local.num_401_calls = 1
return r
dbg_print("handle_407")
if self._thread_local.pos is not None:
# Rewind the file position indicator of the body to where
# it was to resend the request.
r.request.body.seek(self._thread_local.pos)
s_auth = r.headers.get('proxy-authenticate', '')
if 'digest' in s_auth.lower() and self._thread_local.num_401_calls < 2:
self._thread_local.num_401_calls += 1
pat = re.compile(r'digest ', flags=re.IGNORECASE)
self._thread_local.chal = requests.utils.parse_dict_header(
pat.sub('', s_auth, count=1))
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.close()
prep = r.request.copy()
requests.cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
prep.prepare_cookies(prep._cookies)
# FIXME Put that header in the proxy headers
prep.headers['Proxy-Authorization'] = self.build_digest_header(prep.method, prep.url)
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
self._thread_local.num_401_calls = 1
return r
def __call__(self, r):
# Initialize per-thread state, if needed
self.init_per_thread_state()
# If we have a saved nonce, skip the 401
if self._thread_local.last_nonce:
# FIXME When connection is tunneled, there's no need to add this header for each request
r.headers['Proxy-Authorization'] = self.build_digest_header(r.method, r.url)
try:
self._thread_local.pos = r.body.tell()
except AttributeError:
# In the case of HTTPDigestAuth being reused and the body of
# the previous request was a file-like object, pos has the
# file position of the previous body. Ensure it's set to
# None.
self._thread_local.pos = None
r.register_hook('response', self.handle_401)
r.register_hook('response', self.handle_redirect)
self._thread_local.num_401_calls = 1
if self.auth is not None:
# Forward to site auth
r = self.auth(r)
return r
# The following code changes the behavior of urllib3 and httplib/http.client to be able to forward
# proxy authentication error as a HTTP response (instead of an I/O exception) when tunneling
# a HTTPS connection. The main idea is to make httplib/http.client believe the proxy response
# is the webserver response, so that it forwards it to urllib3.
# The changes are:
# - The httplib.HTTPConnection._tunnel() method is called when there is a need to proxy
# HTTPS requests. If the proxy reports an authentication error (407), we raise a ProxyError.
# As we cannot alter the _tunnel() method without re-coding it, we change the class for HTTP
# response with our own (HTTPProxyResponse), and this is that class that will raise the
# ProxyError when parsing the HTTP status line from proxy. Raising a ProxyError prevents the
# _tunnel() method from consuming the proxy response (except its status line that we store
# in httplib.HTTPConnection to pretend later it comes from webserver).
# - The ProxyError is catched in the hooked urllib3.connection.HTTPSConnection.connect() method.
# In the exception handler, we replace the HTTP response class again with our own
# (BufferedHTTPResponse) so that we can forward the proxy status line as if it came from the
# webserver.
# - The exception prevents the establishement of the SSL/TLS communication, so the connection
# must not be used anymore after the processing of current request. This explains why the
# BufferedHTTPResponse._check_close() method always returns True.
# - Once the connection has been established, the request is sent to the webserver. Since
# we already know the proxy refused the connection, there is no need to send the request
# to webserver, so we drop it by hooking the urllib3.connection.HTTPConnection.send() method.
# - The BufferedHTTPResponse instance will be created in HTTPConnection.getresponse(). Once the
# stored status line has been read, the rest of the reponse is read directly from the socket
# as this is the case when reading webserver response (except the reponse comes from proxy
# this time).
# - To make sure proxy authentication data is sent to proxy (and only it), we hook the
# urllib3.connectionpool.HTTPConnectionPool.urlopen() method to move Proxy-Authorization header
# from request to the dedicated list of proxy headers (proxy_headers property). These headers
# will be sent to proxy by the _tunnel() method.
def hook(cls, name=None):
def decorator(func):
import functools
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(original, *args, **kwargs)
if name is None:
# cls is name
g = globals()
original = g[cls]
g[cls] = wrapper
else:
def dummy(*args, **kwargs):
pass
original = getattr(cls, name, dummy)
setattr(cls, name, wrapper)
return wrapper
return decorator
def hook_trace(cls, name=None):
path = cls if name is None else (cls.__name__ + '.' + name)
def print_trace(original, *args, **kwargs):
dbg_print('TRACE<%s>', path)
return original(*args, **kwargs)
hook(cls, name)(print_trace)
class ProxyError(RuntimeError):
pass
class HTTPProxyResponse(httplib.HTTPResponse):
_status_line = None
def __init__(self, sock, *args, **kwargs):
httplib.HTTPResponse.__init__(self, sock, *args, **kwargs)
# This is the proxy response (for CONNECT), make sure it is not buffered
# as we want to pretend this is the server response in case of 407
if sys.version_info[0] == 2:
self.fp = sock.makefile('rb', 0)
else:
self.fp = sock.makefile("rb", buffering=0)
def _read_status(self):
(version, status, reason) = httplib.HTTPResponse._read_status(self)
dbg_print(str((version, status, reason)))
if status == 407:
# We need to forward error from proxy
self._status_line = (version, status, reason)
raise ProxyError()
return (version, status, reason)
class BufferedHTTPResponse(httplib.HTTPResponse):
_status_line = None
def _read_status(self):
dbg_print('%s', self.fp)
return self._status_line
def _check_close(self):
res = httplib.HTTPResponse._check_close(self)
dbg_print("_check_close => %s", res)
# Make sure to close the connection because it is supposed to be encrypted
# and we made sure it is not
return True
# return False
# class Proxy(object):
# _target = None
#
# def __init__(self, target):
# self._target = target
#
# def __getattr__(self, name):
# if name == '_target': return super(Proxy, self).__getattr__(name)
#
# dbg_print('%s.%s', self._target, name)
# # import traceback; traceback.print_stack()
#
# return getattr(self._target, name)
#
# def __setattr__(self, name, value):
# if name == '_target': return super(Proxy, self).__setattr__(name, value)
# return setattr(self._target, name, value)
#
# def __delattr__(self, name):
# if name == '_target': return super(Proxy, self).__delattr__(name)
# return delattr(self._target, name)
#
# class ReaderProxy(Proxy):
# def readline(self, size=-1):
# dbg_print('%s.readline(%s)', self._target, size)
# return self._target.readline(size)
@hook(httplib.HTTPConnection, '_tunnel')
def HTTPConnection__tunnel_hook(original, self, *args, **kwargs):
# Replace response_class in order to hook response
resp = [None]
def resp_builder(*args, **kwargs):
dbg_print('HTTPProxyResponse.__init__')
resp[0] = r = HTTPProxyResponse(*args, **kwargs)
# r.fp = ReaderProxy(r.fp)
return r
response_class = self.response_class
self.response_class = resp_builder
try:
original(self, *args, **kwargs)
finally:
resp = resp[0]
if resp is not None and resp._status_line is not None:
# Copy status line to send it to client later
self._status_line = resp._status_line
# Undo our changes
self.response_class = response_class
# @hook(urllib3.connection.HTTPConnection, '_new_conn')
# def urllib3_connection_HTTPConnection__new_conn_hook(original, self, *args, **kwargs):
# if self._sock is not None:
# # Connection to proxy that refused first CONNECT
# return self._sock
# return original(self, *args, **kwargs)
@hook(urllib3.connection.HTTPSConnection, 'connect')
def urllib3_connection_HTTPSConnection_connect_hook(original, self, *args, **kwargs):
# Reset
self._status_line = None
try:
original(self, *args, **kwargs)
except ProxyError as e:
# Raising and catching this exception prevent the connection from being encrypted
dbg_print("ProxyError")
self.is_verified = True
# HTTPConnection.getresponse must process proxy response (not site)
# This can be done by changing response_class in case of proxy error so that the
# new response will forward the already read HTTP status and continue parsing
# the response body from proxy as if it comes from client
# We should only do that for 4xx and 5xx status to prevent abuse from proxy
# Replace response_class in order to hook response
def resp_builder(*args, **kwargs):
dbg_print('BufferedHTTPResponse.__init__')
r = BufferedHTTPResponse(*args, **kwargs)
r._status_line = self._status_line
# r.fp = ReaderProxy(r.fp)
self.response_class = response_class
return r
# import pdb; pdb.set_trace()
response_class = self.response_class
self.response_class = resp_builder
@hook(urllib3.connection.HTTPConnection, 'send')
def urllib3_connection_HTTPConnection_send_hook(original, self, *args, **kwargs):
if self._status_line is not None:
dbg_print("send(%d)", len(args[0]))
# Prevent sending anything more to proxy
pass
else:
# if args[0].startswith('CONNECT '):
# args = list(args)
# args[0] = args[0].replace('HTTP/1.0', 'HTTP/1.1')
# print(args[0])
return original(self, *args, **kwargs)
@hook(urllib3.connection.HTTPConnection, '__init__')
def urllib3_connection_HTTPConnection___init___hook(original, self, *args, **kwargs):
self._status_line = None
# self._sock = None
return original(self, *args, **kwargs)
@hook(urllib3.connectionpool.HTTPConnectionPool, 'urlopen')
def urllib3_connectionpool_HTTPConnectionPool_urlopen_hook(original, self, *args, **kwargs):
headers = kwargs.get('headers', self.headers)
# Put proxy headers in proxy_headers
# self.proxy_headers['Proxy-Connection'] = 'Keep-Alive'
for (name, value) in list(headers.items()):
if name.lower() == 'proxy-authorization':
del headers[name]
self.proxy_headers[name] = value
dbg_print("<PROXY> %s: %s", name, value)
return original(self, *args, **kwargs)
# @hook(urllib3.connectionpool.HTTPConnectionPool, '_get_conn')
# def urllib3_connectionpool_HTTPConnectionPool__get_conn_hook(original, self, *args, **kwargs):
# conn = original(self, *args, **kwargs)
# if conn._status_line is not None:
# # Pretend socket is not defined so that SSL/TLS connection is setup again
# # if proxy does not refuse password
# conn._sock = conn.sock
# conn.sock = None
#
# return conn
# hook_trace(httplib.HTTPConnection, 'close')
#!/usr/bin/env python
from requests_digest_proxy import HTTPProxyDigestAuth
import sys
import requests
def test():
# http://httpbin.org/basic-auth/user1/password0
# http://httpbin.org/digest-auth/auth/user1/password0
if len(sys.argv) < 4:
sys.stderr.write('Syntax: %s proxy user password\n' % sys.argv[0])
sys.stderr.write('Example: %s http://1.2.3.4:8080/ user1 user1_password\n' % sys.argv[0])
sys.exit(1)
proxies = {
'http': sys.argv[1],
'https': sys.argv[1]
}
proxy_auth = (sys.argv[2], sys.argv[3])
def session():
s = requests.Session()
s.proxies = proxies
s.auth = HTTPProxyDigestAuth(*proxy_auth)
return s
def result(resp):
if resp.status_code != 200:
return "[ERROR] %d - %s" % (resp.status_code, resp.reason)
return resp.text
s = session()
print("IP through HTTP (new session):")
print(result(s.get('http://httpbin.org/ip')))
print("Headers through HTTP (same session):")
print(result(s.get('http://httpbin.org/headers')))
s = session()
print("IP through HTTPS (new session):")
print(result(s.get('https://httpbin.org/ip')))
print("Headers through HTTPS (same session):")
print(result(s.get('https://httpbin.org/headers')))
s = session()
print("Basic Auth through HTTP (new session):")
s.auth = HTTPProxyDigestAuth(*proxy_auth,
auth=requests.auth.HTTPBasicAuth('user1', 'password0'))
print(result(s.get('http://httpbin.org/basic-auth/user1/password0')))
s = session()
print("Digest Auth through HTTP (new session):")
s.auth = HTTPProxyDigestAuth(*proxy_auth,
auth=requests.auth.HTTPDigestAuth('user1', 'password0'))
print(result(s.get('http://httpbin.org/digest-auth/auth/user1/password0')))
s = session()
print("Basic Auth through HTTPS (new session):")
s.auth = HTTPProxyDigestAuth(*proxy_auth,
auth=requests.auth.HTTPBasicAuth('user1', 'password0'))
print(result(s.get('https://httpbin.org/basic-auth/user1/password0')))
s = session()
print("Digest Auth through HTTPS (new session):")
s.auth = HTTPProxyDigestAuth(*proxy_auth,
auth=requests.auth.HTTPDigestAuth('user1', 'password0'))
print(result(s.get('https://httpbin.org/digest-auth/auth/user1/password0')))
test()
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <https://unlicense.org>
@MarvinWinks
Copy link

@Tey: Thanks very much for sharing this. It works as advertised. Can you post usage/license comments on this? Is it free to use?

@Tey
Copy link
Author

Tey commented Sep 28, 2020

@MarvinWinks I've added the UNLICENSE file so that there is no more ambiguity about the license.

@MarvinWinks
Copy link

MarvinWinks commented Sep 28, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment