Skip to content

Instantly share code, notes, and snippets.

@RicterZ
Last active April 11, 2022 08:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RicterZ/40d79feb4173b47c869401890eeab89f to your computer and use it in GitHub Desktop.
Save RicterZ/40d79feb4173b47c869401890eeab89f to your computer and use it in GitHub Desktop.
import requests
from urllib3.connectionpool import *
from urllib3.connectionpool import _Default, _encode_target
from requests.adapters import HTTPAdapter
class FixedHTTPConnectionPool(HTTPConnectionPool):
_url = ''
def __init__(self, *args, **kwargs):
self._url = kwargs.pop('_url')
super(FixedHTTPConnectionPool, self).__init__(*args, **kwargs)
def urlopen(self, method, url, body=None, headers=None, retries=None, redirect=True,
assert_same_host=True, timeout=_Default, pool_timeout=None, release_conn=None,
chunked=False, body_pos=None, **response_kw):
parsed_url = parse_url(url)
destination_scheme = parsed_url.scheme
if headers is None:
headers = self.headers
if not isinstance(retries, Retry):
retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
if release_conn is None:
release_conn = response_kw.get("preload_content", True)
# Check host
if assert_same_host and not self.is_same_host(url):
raise HostChangedError(self, url, retries)
# Ensure that the URL we're connecting to is properly encoded
url = self._url
conn = None
release_this_conn = release_conn
http_tunnel_required = connection_requires_http_tunnel(
self.proxy, self.proxy_config, destination_scheme
)
if not http_tunnel_required:
headers = headers.copy()
headers.update(self.proxy_headers)
err = None
clean_exit = False
body_pos = set_file_position(body, body_pos)
try:
timeout_obj = self._get_timeout(timeout)
conn = self._get_conn(timeout=pool_timeout)
conn.timeout = timeout_obj.connect_timeout
is_new_proxy_conn = self.proxy is not None and not getattr(
conn, "sock", None
)
if is_new_proxy_conn and http_tunnel_required:
self._prepare_proxy(conn)
httplib_response = self._make_request(
conn,
method,
url,
timeout=timeout_obj,
body=body,
headers=headers,
chunked=chunked,
)
response_conn = conn if not release_conn else None
response_kw["request_method"] = method
response = self.ResponseCls.from_httplib(
httplib_response,
pool=self,
connection=response_conn,
retries=retries,
**response_kw
)
# Everything went great!
clean_exit = True
except EmptyPoolError:
# Didn't get a connection from the pool, no need to clean up
clean_exit = True
release_this_conn = False
raise
except (
TimeoutError,
HTTPException,
SocketError,
ProtocolError,
BaseSSLError,
SSLError,
CertificateError,
) as e:
clean_exit = False
if isinstance(e, (BaseSSLError, CertificateError)):
e = SSLError(e)
elif isinstance(e, (SocketError, NewConnectionError)) and self.proxy:
e = ProxyError("Cannot connect to proxy.", e)
elif isinstance(e, (SocketError, HTTPException)):
e = ProtocolError("Connection aborted.", e)
retries = retries.increment(
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
)
retries.sleep()
err = e
finally:
if not clean_exit:
conn = conn and conn.close()
release_this_conn = True
if release_this_conn:
self._put_conn(conn)
if not conn:
log.warning(
"Retrying (%r) after connection broken by '%r': %s", retries, err, url
)
return self.urlopen(
method,
url,
body,
headers,
retries,
redirect,
assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
**response_kw
)
redirect_location = redirect and response.get_redirect_location()
if redirect_location:
if response.status == 303:
method = "GET"
try:
retries = retries.increment(method, url, response=response, _pool=self)
except MaxRetryError:
if retries.raise_on_redirect:
response.drain_conn()
raise
return response
response.drain_conn()
retries.sleep_for_retry(response)
log.debug("Redirecting %s -> %s", url, redirect_location)
return self.urlopen(
method,
redirect_location,
body,
headers,
retries=retries,
redirect=redirect,
assert_same_host=assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
**response_kw
)
# Check if we should retry the HTTP response.
has_retry_after = bool(response.getheader("Retry-After"))
if retries.is_retry(method, response.status, has_retry_after):
try:
retries = retries.increment(method, url, response=response, _pool=self)
except MaxRetryError:
if retries.raise_on_status:
response.drain_conn()
raise
return response
response.drain_conn()
retries.sleep(response)
log.debug("Retry: %s", url)
return self.urlopen(
method,
url,
body,
headers,
retries=retries,
redirect=redirect,
assert_same_host=assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
**response_kw
)
return response
class FixedHTTPAdapter(HTTPAdapter):
_url = ''
def __init__(self, *args, **kwargs):
self._url = kwargs.pop('_url')
super(FixedHTTPAdapter, self).__init__(*args, **kwargs)
def get_connection(self, *args, **kwargs):
u = parse_url(url)
uri = '{}#{}'.format(u.path, u.fragment)
if u.scheme == 'https':
raise NotImplementedError
else:
return FixedHTTPConnectionPool(host=u.host, port=u.port, _url=uri)
def request_url(self, *args, **kwargs):
return self._url
class RequestSess(requests.Session):
_url = ''
def __init__(self, u):
self._url = u
super(RequestSess, self).__init__()
def get_adapter(self, *args, **kwargs):
return FixedHTTPAdapter(_url=self._url)
url = 'http://localhost:1234/#/test'
sess = RequestSess(url)
response = sess.get(url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment