Skip to content

Instantly share code, notes, and snippets.

@henriquebastos
Created February 18, 2022 01:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save henriquebastos/50d84ec5d1147888bcd84f5bff612c97 to your computer and use it in GitHub Desktop.
Save henriquebastos/50d84ec5d1147888bcd84f5bff612c97 to your computer and use it in GitHub Desktop.
Implementação de um BaseAuth complexo como exemplo.
from datetime import datetime
from requests.auth import AuthBase
class EduzzAuth(AuthBase):
AUTH_PATH = "/credential/generate_token"
TOKEN_EXPIRED_ERROR_CODE = "#0029"
def __init__(self, email, publickey, apikey, session_class):
self.credentials = dict(email=email, publickey=publickey, apikey=apikey)
self.session_class = session_class
self._token = None
self._token_valid_until = None
@property
def has_token(self):
return bool(self._token)
@property
def is_expired(self):
if self._token_valid_until is None:
return True
return datetime.now() > self._token_valid_until
@property
def token(self):
if self.is_expired:
self.renew()
return self._token
@token.setter
def token(self, values):
"""For testing purpose only."""
self._token, self._token_valid_until = values
def renew(self):
session = self.session_class()
r = session.post(self.AUTH_PATH, params=self.credentials)
r.raise_for_status()
json = r.json()
data = json["data"]
self.token = data["token"], data["token_valid_until"]
def handle_401(self, r, **kwargs):
if r.status_code != 401:
return r
# Only try to recover if token expired
json = r.json()
if json["code"] != self.TOKEN_EXPIRED_ERROR_CODE:
return r
# force token renew
self.renew()
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.close()
prep = r.request.copy()
prep.headers["Token"] = self.token
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
def store_refreshed_token(self, r, **kwargs):
if 400 <= r.status_code < 600:
return r
if r.request.url.endswith(self.AUTH_PATH):
return r
json = r.json()
if json["success"] is not True:
return r
profile = json["profile"]
self.token = profile["token"], profile["token_valid_until"]
return r
def __call__(self, r):
r.headers["Token"] = self.token
r.register_hook("response", self.handle_401)
r.register_hook("response", self.store_refreshed_token)
return r
import pytest
@pytest.fixture
def httpretty(allow_net_connect=False, verbose=False):
import httpretty
httpretty.reset()
httpretty.enable(allow_net_connect=allow_net_connect, verbose=verbose)
yield httpretty
httpretty.disable()
httpretty.reset()
from datetime import datetime
from re import compile as regex
import pytest
import requests
from freezegun import freeze_time
from responses import RequestsMock
from eduzz.sessions import EduzzAuth, EduzzAPIError, EduzzSession
from eduzz.tests import ResponsesSequence
NOW = datetime(2021, 12, 4, 0, 0, 0)
BEFORE_NOW = datetime(2021, 12, 3, 23, 59, 59)
NOW_PLUS_15 = datetime(2021, 12, 4, 0, 15, 0)
def test_auth_is_expired_on_init(auth):
assert auth.is_expired
@freeze_time(NOW)
def test_auth_expiration_logic(auth):
assert auth.is_expired, "Token should be empty."
auth.token = ("VALID", NOW)
assert not auth.is_expired, "Token should be valid on the limit."
auth.token = ("EXPIRED", BEFORE_NOW)
assert auth.is_expired, "Token should be just expired."
def test_auth_renew_when_token_empty(auth, req, responses):
responses.add(
responses.POST,
regex(".+/generate_token"),
json=token_body(token="VALID"),
status=201,
)
req.prepare("GET", "https://h/first", auth=auth)
assert req.headers["Token"] == "VALID"
@freeze_time(NOW)
def test_auth_renew_when_token_expired(auth, req, responses):
auth.token = ("EXPIRED", BEFORE_NOW)
responses.add(
responses.POST,
regex(".+/generate_token"),
json=token_body("VALID", NOW_PLUS_15),
status=201,
)
req.prepare("GET", "https://h/first", auth=auth)
assert req.headers["Token"] == "VALID"
def test_auth_raises_for_empty_credentials(auth, req, responses):
responses.add(
responses.POST,
regex(".+/generate_token"),
json=error_body("#0001"),
status=401,
)
with pytest.raises(EduzzAPIError, match="#0001 Empty credentials"):
req.prepare("GET", "https://h/first", auth=auth)
def test_auth_raises_for_invalid_credentials(auth, req, responses):
responses.add(
responses.POST,
regex(".+/generate_token"),
json=error_body("#0002"),
status=401,
)
with pytest.raises(EduzzAPIError, match="#0002 Invalid credentials"):
req.prepare("GET", "https://h/first", auth=auth)
def test_auth_raise_for_forbidden_access(auth, req, responses):
responses.add(
responses.POST,
regex(".+/generate_token"),
json=error_body("#0010"),
status=401,
)
with pytest.raises(EduzzAPIError, match="#0010 Forbiden access"):
req.prepare("GET", "https://h/first", auth=auth)
@freeze_time(NOW)
def test_auth_recover_from_undetected_expired_token(auth, responses):
responses.add_callback(
responses.POST,
regex(".+/generate_token"),
ResponsesSequence(
(201, "", token_body("T1", NOW)),
(201, "", token_body("T2", NOW_PLUS_15)),
),
)
responses.add_callback(
responses.GET,
"https://h/first",
ResponsesSequence(
(401, "", error_body("#0029")), (200, "", data_body())
),
)
r = requests.get("https://h/first", auth=auth)
assert len(responses.calls) == 4
assert r.status_code == 200
@freeze_time(NOW)
def test_auth_updates_with_refreshed_token(auth, responses):
responses.add(
responses.POST,
regex(".+/generate_token"),
status=201,
json=token_body("VALID", NOW),
)
responses.add(
responses.GET,
regex(".+/first"),
status=200,
json=data_body(token="T2", token_valid_until=NOW_PLUS_15),
)
requests.get("https://h/first", auth=auth)
assert len(responses.calls) == 2
assert auth.token == "T2", NOW_PLUS_15
@pytest.fixture
def auth():
return EduzzAuth("e@mail.com", "PUBLICKEY", "APIKEY", EduzzSession)
@pytest.fixture
def req():
return requests.PreparedRequest()
@pytest.fixture
def responses(monkeypatch):
import responses as responses_module
from functools import partial
from eduzz.sessions.json_session import BetterJSONEncoder, BetterJSONDecoder
with monkeypatch.context() as m:
m.setattr(
responses_module.json_module,
"loads",
partial(responses_module.json_module.loads, cls=BetterJSONDecoder),
)
m.setattr(
responses_module.json_module,
"dumps",
partial(responses_module.json_module.dumps, cls=BetterJSONEncoder),
)
with RequestsMock() as rm:
yield rm
ERRORS = {
"#0001": "Empty credentials",
"#0002": "Invalid credentials",
"#0010": "Forbiden access",
"#0029": "Expired Jwt Token",
}
def error_body(code):
return {
"success": False,
"code": code,
"details": ERRORS[code],
"link": "https://api2.eduzz.com",
}
def token_body(token="VALID", token_valid_until=NOW_PLUS_15):
return {
"success": True,
"data": {"token": token, "token_valid_until": token_valid_until},
}
def data_body(token="VALID", token_valid_until=NOW_PLUS_15):
return {
"success": True,
"data": [{"id": 1}],
"profile": {"token": token, "token_valid_until": token_valid_until},
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment