Skip to content

Instantly share code, notes, and snippets.

@HacKanCuBa
Last active January 2, 2020 00:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HacKanCuBa/28e7c08b967422fe04f2d057086216c8 to your computer and use it in GitHub Desktop.
Save HacKanCuBa/28e7c08b967422fe04f2d057086216c8 to your computer and use it in GitHub Desktop.
"""Wrappers around Python 3 Requests library.
This lib will log errors, warnings and request duration, not raising any
exception: in such error cases, an empty dict is returned. To identify, if
necessary, that there where errors, a with_error flag must be set in the
arguments so that the methods return a tuple in the form of
(response_data: any, error: bool).
If there's any response expected from the endpoint, it will be returned
JSON-converted as-is, which means it's either valid JSON (string, number,
list, dict) or an empty dict (default response value, which is still valid
JSON).
"""
import asyncio
import functools
import logging
import typing
from time import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
__version__ = '0.7.3'
__author__ = 'HacKan (https://hackan.net)'
__license__ = 'GPL-3+'
__url__ = 'https://gist.github.com/HacKanCuBa/28e7c08b967422fe04f2d057086216c8'
logger = logging.getLogger(__name__)
TJSONData = typing.Optional[typing.Union[dict, list, int, str]]
def retry_connector(
*,
retries: int = 3,
backoff_factor: float = 0.3,
status_forcelist: typing.Sequence[int] = (500, 502, 504),
session: typing.Optional[requests.Session] = None,
) -> requests.Session:
"""Get a connector with retry mechanism incorporated.
:Author: Peter Bengtsson (https://www.peterbe.com)
:URL: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
class SimpleResponse(typing.NamedTuple):
"""Simple response class."""
response: typing.Optional[requests.Response]
exception: bool
class SimpleDataResponse(typing.NamedTuple):
"""Simple data response class."""
response_data: TJSONData
exception: bool
response_status: typing.Optional[int]
class SimpleRequest:
"""Wrapper over requests lib that catches and logs errors and connection times."""
VERIFY_SSL: bool = True
@classmethod
def request(
cls,
method: str,
url: str,
*,
timeout: typing.Union[int, float],
retries: typing.Optional[int] = None,
backoff_factor: float = 0.3,
status_forcelist: typing.Optional[typing.Sequence[int]] = None,
**kwargs,
) -> SimpleResponse:
"""Make a request to an endpoint, return the response if any.
Request time, errors and exceptions are all logged using the standard
logger.
:param method: The request method as string, such as GET, POST, PUT, etc.
:param url: Endpoint URL as string.
:param timeout: Connection timeout in seconds (0 for inf).
:param retries: Any number bigger than 0 implies usage of a retry strategy.
:param backoff_factor: Factor to apply to wait time for retries.
:param status_forcelist: Set of status codes over 500 that you want to
forcibly retry.
:param kwargs: Additional arguments for `requests`.
:return: An object with the response (if any) and a bool representing
the occurrence of an exception.
"""
exception = False
response = None
if timeout > 0:
kwargs['timeout'] = timeout
if 'verify' not in kwargs:
kwargs['verify'] = cls.VERIFY_SSL
if retries:
connector = retry_connector(retries=retries, backoff_factor=backoff_factor,
status_forcelist=status_forcelist)
else:
connector = requests
request_time_start = time()
try:
response = connector.request(method, url, **kwargs)
request_time_end = time()
except (requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout):
request_time_end = time()
logger.exception(
'Error [%s]ing data (kwargs: %s) to/from the endpoint (url: %s)',
method,
str(kwargs),
url,
)
exception = True
else:
if not response.ok:
logger.warning(
'Response from endpoint %s (kwargs: %s) is NOT OK: %d %s',
url,
str(kwargs),
response.status_code,
response.text,
)
logger.debug(
'Request to endpoint %s took %.2f seconds',
url,
request_time_end - request_time_start,
)
return SimpleResponse(response, exception)
@classmethod
async def aiorequest(
cls,
method: str,
url: str,
*,
timeout: typing.Union[int, float],
retries: typing.Optional[int] = None,
backoff_factor: float = 0.3,
status_forcelist: typing.Optional[typing.Sequence[int]] = None,
**kwargs,
) -> SimpleResponse:
"""Asynchronously make a request to an endpoint, return the response if any.
Request time, errors and exceptions are all logged using the standard
logger.
:param method: The request method as string, such as GET, POST, PUT, etc.
:param url: Endpoint URL as string.
:param timeout: Connection timeout in seconds (0 for inf).
:param retries: Any number bigger than 0 implies usage of a retry strategy.
:param backoff_factor: Factor to apply to wait time for retries.
:param status_forcelist: Set of status codes over 500 that you want to
forcibly retry.
:param kwargs: Additional arguments for `requests`.
:return: An object with the response (if any) and a bool representing
the occurrence of an exception.
"""
loop = asyncio.get_running_loop()
response, exception = await loop.run_in_executor(None, functools.partial(
cls.request,
method,
url,
timeout=timeout,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
))
return SimpleResponse(response, exception)
class JSONConnector:
"""Generic requests wrapper class to handle JSON endpoints."""
@staticmethod
def request(
method: str,
url: str,
*,
timeout: typing.Union[int, float],
retries: typing.Optional[int] = None,
backoff_factor: float = 0.3,
status_forcelist: typing.Optional[typing.Sequence[int]] = None,
**kwargs,
) -> SimpleDataResponse:
"""Make a request to a JSON endpoint, return the JSON converted response if any.
Request time, errors and exceptions are all logged using the standard
logger.
Note that the type of the returned response depends on the endpoint,
but it will always be some valid JSON.
To know whether an exception occurred or not check the exception property
of the return value.
:param method: The request method as string, such as GET, POST, PUT, etc.
:param url: Endpoint URL as string.
:param timeout: Connection timeout in seconds (0 for inf).
:param retries: Any number bigger than 0 implies usage of a retry strategy.
:param backoff_factor: Factor to apply to wait time for retries.
:param status_forcelist: Set of status codes over 500 that you want to
forcibly retry.
:param kwargs: Additional arguments for `requests`.
:return: An object with the response data (if any), a bool representing
the occurrence of an exception and the response HTTP status code.
"""
response_data = None
headers = kwargs.get('headers') or {}
headers['content-type'] = 'application/json'
kwargs['headers'] = headers
response, exception = SimpleRequest.request(
method,
url,
timeout=timeout,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
)
if exception:
status_code = None
else:
status_code = response.status_code
try:
response_data = response.json()
except ValueError:
if response.text: # Could be an empty response
logger.warning(
'Response from endpoint %s is not valid JSON: %d %s',
url,
response.status_code,
response.text,
)
return SimpleDataResponse(response_data, exception, status_code)
@classmethod
def get(
cls,
url: str,
*,
timeout: typing.Union[int, float],
retries: typing.Optional[int] = None,
backoff_factor: float = 0.3,
status_forcelist: typing.Optional[typing.Sequence[int]] = None,
**kwargs,
) -> SimpleDataResponse:
"""Retrieve data from a JSON endpoint, return the JSON converted response if any.
Request time, errors and exceptions are all logged using the standard
logger.
Note that the type of the returned response depends on the endpoint,
but it will always be some valid JSON.
To know whether an exception occurred or not check the exception property
of the return value.
:param url: Endpoint URL as string.
:param timeout: Connection timeout in seconds (0 for inf).
:param retries: Any number bigger than 0 implies usage of a retry strategy.
:param backoff_factor: Factor to apply to wait time for retries.
:param status_forcelist: Set of status codes over 500 that you want to
forcibly retry.
:param kwargs: Additional arguments for `requests`.
:return: An object with the response data (if any) and a bool representing
the occurrence of an exception.
"""
return cls.request(
'GET',
url,
timeout=timeout,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
)
@classmethod
def post(
cls,
url: str,
data: typing.Union[str, dict, bytes, list, tuple],
*,
timeout: typing.Union[int, float],
retries: typing.Optional[int] = None,
backoff_factor: float = 0.3,
status_forcelist: typing.Optional[typing.Sequence[int]] = None,
**kwargs,
) -> SimpleDataResponse:
"""Post data to a JSON endpoint, return the JSON converted response if any.
If given data is a string, it will be previously encoded as if it were UTF-8.
It is recommended to not send strings but encoded ones as bytes.
Request time, errors and exceptions are all logged using the standard
logger.
Note that the type of the returned response depends on the endpoint,
but it will always be some valid JSON.
To know whether an exception occurred or not check the exception property
of the return value.
:param url: Endpoint URL as string.
:param data: Data to post, either as a dictionary (JSON valid) or a string.
:param timeout: Connection timeout in seconds (0 for inf).
:param retries: Any number bigger than 0 implies usage of a retry strategy.
:param backoff_factor: Factor to apply to wait time for retries.
:param status_forcelist: Set of status codes over 500 that you want to
forcibly retry.
:param kwargs: Additional arguments for `requests`.
:return: An object with the response data (if any) and a bool representing
the occurrence of an exception.
"""
if isinstance(data, (dict, list, tuple)):
return cls.request(
'POST',
url,
timeout=timeout,
json=data,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
)
elif isinstance(data, str):
data = data.encode()
return cls.request(
'POST',
url,
timeout=timeout,
data=data,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
)
class JSONConnectorAsync:
"""Generic requests wrapper class to handle JSON endpoints asynchronously."""
@staticmethod
async def request(
method: str,
url: str,
*,
timeout: typing.Union[int, float],
retries: typing.Optional[int] = None,
backoff_factor: float = 0.3,
status_forcelist: typing.Optional[typing.Sequence[int]] = None,
**kwargs,
) -> SimpleDataResponse:
"""Make a request to a JSON endpoint, return the JSON converted response if any.
Request time, errors and exceptions are all logged using the standard
logger.
Note that the type of the returned response depends on the endpoint,
but it will always be some valid JSON.
To know whether an exception occurred or not check the exception property
of the return value.
:param method: The request method as string, such as GET, POST, PUT, etc.
:param url: Endpoint URL as string.
:param timeout: Connection timeout in seconds (0 for inf).
:param retries: Any number bigger than 0 implies usage of a retry strategy.
:param backoff_factor: Factor to apply to wait time for retries.
:param status_forcelist: Set of status codes over 500 that you want to
forcibly retry.
:param kwargs: Additional arguments for `requests`.
:return: An object with the response data (if any) and a bool representing
the occurrence of an exception.
"""
response_data = None
headers = kwargs.get('headers') or {}
headers['content-type'] = 'application/json'
kwargs['headers'] = headers
response, exception = await SimpleRequest.aiorequest(
method,
url,
timeout=timeout,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
)
if exception:
status_code = None
else:
status_code = response.status_code
try:
response_data = response.json()
except ValueError:
if response.text: # Could be an empty response
logger.warning(
'Response from endpoint %s is not valid JSON: %d %s',
url,
response.status_code,
response.text,
)
return SimpleDataResponse(response_data, exception, status_code)
@classmethod
async def get(
cls,
url: str,
*,
timeout: typing.Union[int, float],
retries: typing.Optional[int] = None,
backoff_factor: float = 0.3,
status_forcelist: typing.Optional[typing.Sequence[int]] = None,
**kwargs,
) -> SimpleDataResponse:
"""Retrieve data from a JSON endpoint, return the JSON converted response if any.
Request time, errors and exceptions are all logged using the standard
logger.
Note that the type of the returned response depends on the endpoint,
but it will always be some valid JSON.
To know whether an exception occurred or not check the exception property
of the return value.
:param url: Endpoint URL as string.
:param timeout: Connection timeout in seconds (0 for inf).
:param retries: Any number bigger than 0 implies usage of a retry strategy.
:param backoff_factor: Factor to apply to wait time for retries.
:param status_forcelist: Set of status codes over 500 that you want to
forcibly retry.
:param kwargs: Additional arguments for `requests`.
:return: An object with the response data (if any) and a bool representing
the occurrence of an exception.
"""
return await cls.request(
'GET',
url,
timeout=timeout,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
)
@classmethod
async def post(
cls,
url: str,
data: typing.Union[str, dict, bytes, list, tuple],
retries: typing.Optional[int] = None,
backoff_factor: float = 0.3,
status_forcelist: typing.Optional[typing.Sequence[int]] = None,
*,
timeout: typing.Union[int, float],
**kwargs,
) -> SimpleDataResponse:
"""Post data to a JSON endpoint, return the JSON converted response if any.
If given data is a string, it will be previously encoded as if it were UTF-8.
It is recommended to not send strings but encoded ones as bytes.
Request time, errors and exceptions are all logged using the standard
logger.
Note that the type of the returned response depends on the endpoint,
but it will always be some valid JSON.
To know whether an exception occurred or not check the exception property
of the return value.
:param url: Endpoint URL as string.
:param data: Data to post, either as a dictionary (JSON valid) or a string.
:param timeout: Connection timeout in seconds (0 for inf).
:param retries: Any number bigger than 0 implies usage of a retry strategy.
:param backoff_factor: Factor to apply to wait time for retries.
:param status_forcelist: Set of status codes over 500 that you want to
forcibly retry.
:param kwargs: Additional arguments for `requests`.
:return: An object with the response data (if any) and a bool representing
the occurrence of an exception.
"""
if isinstance(data, (dict, list, tuple)):
return await cls.request(
'POST',
url,
timeout=timeout,
json=data,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
)
elif isinstance(data, str):
data = data.encode()
return await cls.request(
'POST',
url,
timeout=timeout,
data=data,
retries=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
**kwargs,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment