Last active
September 28, 2018 13:54
-
-
Save Integralist/7dfb75e058f584761948169a93dd1838 to your computer and use it in GitHub Desktop.
[Python Tornado Decorator with arguments] #python #decorator #tornado #dict #compare
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def auth(*args, **kwargs): | |
client = kwargs['client'] | |
def wrapper(handler_method): | |
"""input will be, for example, RequestHandler.get or RequestHandler.post""" | |
@wraps(handler_method) | |
async def request_handler_wrapper(*args, **kwargs): | |
"""input is the request handler class and params from tornado.routing.Rule""" | |
request_handler = args[0] | |
cookie = acquire_cookie(request_handler) | |
if invalid_cookie(cookie): | |
return request_handler.send_error(status_code=401) | |
# raise tornado.web.HTTPError(401) | |
tokens = extract_tokens_from_cookie(cookie) | |
if not tokens: | |
return request_handler.send_error(status_code=401) | |
# raise tornado.web.HTTPError(401) | |
id_payload = await verify_tokens(tokens, client) | |
if not id_payload: | |
return request_handler.send_error(status_code=401) | |
# raise tornado.web.HTTPError(401) | |
kwargs = {"auth_data": id_payload, **kwargs} | |
return await handler_method(*args, **kwargs) | |
return request_handler_wrapper | |
return wrapper | |
def acquire_cookie(request_handler): | |
return request_handler.get_cookie(cookie_name) | |
def invalid_cookie(cookie): | |
if not cookie: | |
return True | |
if cookie.find('access_token') == -1: | |
return True | |
return False | |
def extract_tokens_from_cookie(cookie): | |
p = re.compile(r'id_token=(?P<id_token>.+),access_token=(?P<access_token>.+),refresh_token=(?P<refresh_token>.+)') # noqa | |
tokens = p.search(cookie) | |
return tokens | |
def build_endpoint(host, path) -> str: | |
not_prod = settings.get('cluster') != 'prod' | |
buzzfeed = 'buzzfeed.com' in host | |
gateway = re.search('api-(?:public-)?stage.buzzfeed.com', host) | |
if not_prod and buzzfeed and not gateway: | |
creds = '{}:{}'.format(webapp_stage_user, webapp_stage_pass) | |
host = '{}@{}'.format(creds, host) | |
return 'https://{}{}'.format(host, path) | |
async def verify_tokens(tokens, api_gateway): | |
if not tokens: | |
return | |
api_path = '/bfauth/tokens/verify' | |
endpoint = build_endpoint(api_gateway_host, api_path) | |
post_data = {'id_token': tokens.group('id_token'), | |
'access_token': tokens.group('access_token'), | |
'refresh_token': tokens.group('refresh_token')} | |
body = urllib.parse.urlencode(post_data) | |
try: | |
response = await api_gateway.post(endpoint, body=body, debug=debug, request_timeout=10) | |
except bf_api_gateway.errors.BackoffError as err: | |
metrics.incr("verify_tokens", tags={"service": service, | |
"status": "error", | |
"context": "{}. {}".format(type(err), str(err))}) | |
return | |
except bf_api_gateway.errors.APIGatewayError as err: | |
metrics.incr("verify_tokens", tags={"service": service, | |
"status": "error", | |
"context": "{}. {}".format(type(err), str(err))}) | |
return | |
try: | |
response_data = json.loads(response.body) | |
except Exception as err: | |
metrics.incr("verify_tokens", tags={"service": service, | |
"status": "error", | |
"context": "{}. {}".format(type(err), str(err))}) | |
return | |
if response_data.get('status') == 'error': | |
metrics.incr("verify_tokens", tags={"service": service, | |
"status": "error", | |
"context": "status returned error"}) | |
return | |
return response_data.get('payload') | |
class AuthTest(tornado.web.RequestHandler): | |
@auth('get', client=mock_apigateway) | |
async def get(self, *args, **kwargs): | |
""" kwargs = {"auth_data": "...", "path": "protected"} """ | |
self.finish(kwargs) | |
return tornado.web.Application([ | |
(r'/(?P<path>protected)', AuthTest), | |
]) | |
@mock.patch("bf_auth.subject.build_endpoint") | |
def test_successful_authentication(self, mock_endpoint): | |
"""Test a cookie is acquired, and the tokens are extracted and verified.""" | |
mock_cookie = 'mycookiename=id_token=1,access_token=2,refresh_token=3;' | |
mock_endpoint.return_value = 'https://example-api.com' | |
mock_apigateway.post.side_effect = mock_verify_tokens_success | |
response = self.fetch('/protected', headers={"Cookie": mock_cookie}) | |
assert response.code == 200 | |
# Python 3.5 doesn't guarantee dictionary ordering, so we use json sort_keys | |
actual = json.dumps(json.loads(response.body.decode()), sort_keys=True) | |
expected = '{"auth_data": "foobar", "path": "protected"}' | |
assert actual == expected |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment