Skip to content

Instantly share code, notes, and snippets.

@tomchristie
Last active October 6, 2020 15:48
Show Gist options
  • Save tomchristie/e6e5e06b6c7b77b9c726b74d1ba0acbc to your computer and use it in GitHub Desktop.
Save tomchristie/e6e5e06b6c7b77b9c726b74d1ba0acbc to your computer and use it in GitHub Desktop.
class Client():
...
def request(
self,
method: str,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Build and send a request.
Equivalent to:
```python
request = client.build_request(...)
with client.send(request, ...) as response:
response.read()
```
See `Client.build_request()`, `Client.send()` and
[Merging of configuration][0] for how the various parameters
are merged with client-level configuration.
[0]: /advanced/#merging-of-configuration
"""
request = self.build_request(
method=method,
url=url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
with self.send(
request=request,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
) as response:
response.read()
return response
@contextlib.contextmanager
def stream(
self,
method: str,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> typing.ContextManager[Response]:
"""
Alternative to `httpx.request()` that streams the response body
instead of loading it into memory at once.
**Parameters**: See `httpx.request`.
See also: [Streaming Responses][0]
[0]: /quickstart#streaming-responses
"""
request = self.build_request(
method=method,
url=url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
with self.send(
request=request,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
) as response:
yield response
@contextlib.contextmanager
def send(
self,
request: Request,
*,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> typing.ContextManager[Response]:
"""
Send a request.
The request is sent as-is, unmodified.
Typically you'll want to build one with `Client.build_request()`
so that any client-level configuration is merged into the request,
but passing an explicit `httpx.Request()` is supported as well.
See also: [Request instances][0]
[0]: /advanced/#request-instances
"""
if self._state == ClientState.CLOSED:
raise RuntimeError("Cannot send a request, as the client has been closed.")
self._state = ClientState.OPENED
timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
auth = self._build_request_auth(request, auth)
with self._send_handling_auth(
request,
auth=auth,
timeout=timeout,
allow_redirects=allow_redirects,
history=[],
) as response:
for hook in self._event_hooks["response"]:
hook(response)
yield response
@contextlib.contextmanager
def _send_handling_auth(
self,
request: Request,
auth: Auth,
timeout: Timeout,
allow_redirects: bool,
history: typing.List[Response],
) -> typing.ContextManager[Response]:
auth_flow = auth.sync_auth_flow(request)
request = next(auth_flow)
for hook in self._event_hooks["request"]:
hook(request)
while True:
with self._send_handling_redirects(
request,
timeout=timeout,
allow_redirects=allow_redirects,
history=history,
) as response:
try:
next_request = auth_flow.send(response)
except StopIteration:
yield response
break
response.history = list(history)
response.read()
request = next_request
history.append(response)
@contextlib.contextmanager
def _send_handling_redirects(
self,
request: Request,
timeout: Timeout,
allow_redirects: bool,
history: typing.List[Response],
) -> typing.ContextManager[Response]:
while True:
if len(history) > self.max_redirects:
raise TooManyRedirects(
"Exceeded maximum allowed redirects.", request=request
)
with self._send_single_request(request, timeout) as response:
response.history = list(history)
if not response.is_redirect:
yield response
break
request = self._build_redirect_request(request, response)
history = history + [response]
if allow_redirects:
response.read()
else:
response.next_request = request
yield response
break
@contextlib.contextmanager
def _send_single_request(self, request: Request, timeout: Timeout) -> typing.ContextManager[Response]:
"""
Sends a single request, without handling any redirections.
"""
transport = self._transport_for_url(request.url)
timer = Timer()
timer.sync_start()
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
with transport.request(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
stream=request.stream, # type: ignore
ext={"timeout": timeout.as_dict()},
) as (status_code, headers, stream, ext):
response = Response(
status_code,
headers=headers,
stream=stream, # type: ignore
ext=ext,
request=request,
)
self.cookies.extract_cookies(response)
status = f"{response.status_code} {response.reason_phrase}"
response_line = f"{response.http_version} {status}"
logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"')
yield response
response.elapsed = datetime.timedelta(seconds=timer.sync_elapsed())
import contextlib
import httpx
class HTTPTransport:
@contextlib.contextmanager
def request(self, method, url, headers, stream, ext):
yield 200, {}, [], {}
def close(self):
pass
c = httpx.Client(transport=HTTPTransport())
r = c.get('https://www.example.org/')
print(r)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment