Skip to content

Instantly share code, notes, and snippets.

@Integralist
Last active September 28, 2018 13:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Integralist/7dfb75e058f584761948169a93dd1838 to your computer and use it in GitHub Desktop.
Save Integralist/7dfb75e058f584761948169a93dd1838 to your computer and use it in GitHub Desktop.
[Python Tornado Decorator with arguments] #python #decorator #tornado #dict #compare
def auth(*args, **kwargs):
client = kwargs['client']
def wrapper(handler_method):
"""input will be, for example, RequestHandler.get or RequestHandler.post"""
@wraps(handler_method)
async def request_handler_wrapper(*args, **kwargs):
"""input is the request handler class and params from tornado.routing.Rule"""
request_handler = args[0]
cookie = acquire_cookie(request_handler)
if invalid_cookie(cookie):
return request_handler.send_error(status_code=401)
# raise tornado.web.HTTPError(401)
tokens = extract_tokens_from_cookie(cookie)
if not tokens:
return request_handler.send_error(status_code=401)
# raise tornado.web.HTTPError(401)
id_payload = await verify_tokens(tokens, client)
if not id_payload:
return request_handler.send_error(status_code=401)
# raise tornado.web.HTTPError(401)
kwargs = {"auth_data": id_payload, **kwargs}
return await handler_method(*args, **kwargs)
return request_handler_wrapper
return wrapper
def acquire_cookie(request_handler):
return request_handler.get_cookie(cookie_name)
def invalid_cookie(cookie):
if not cookie:
return True
if cookie.find('access_token') == -1:
return True
return False
def extract_tokens_from_cookie(cookie):
p = re.compile(r'id_token=(?P<id_token>.+),access_token=(?P<access_token>.+),refresh_token=(?P<refresh_token>.+)') # noqa
tokens = p.search(cookie)
return tokens
def build_endpoint(host, path) -> str:
not_prod = settings.get('cluster') != 'prod'
buzzfeed = 'buzzfeed.com' in host
gateway = re.search('api-(?:public-)?stage.buzzfeed.com', host)
if not_prod and buzzfeed and not gateway:
creds = '{}:{}'.format(webapp_stage_user, webapp_stage_pass)
host = '{}@{}'.format(creds, host)
return 'https://{}{}'.format(host, path)
async def verify_tokens(tokens, api_gateway):
if not tokens:
return
api_path = '/bfauth/tokens/verify'
endpoint = build_endpoint(api_gateway_host, api_path)
post_data = {'id_token': tokens.group('id_token'),
'access_token': tokens.group('access_token'),
'refresh_token': tokens.group('refresh_token')}
body = urllib.parse.urlencode(post_data)
try:
response = await api_gateway.post(endpoint, body=body, debug=debug, request_timeout=10)
except bf_api_gateway.errors.BackoffError as err:
metrics.incr("verify_tokens", tags={"service": service,
"status": "error",
"context": "{}. {}".format(type(err), str(err))})
return
except bf_api_gateway.errors.APIGatewayError as err:
metrics.incr("verify_tokens", tags={"service": service,
"status": "error",
"context": "{}. {}".format(type(err), str(err))})
return
try:
response_data = json.loads(response.body)
except Exception as err:
metrics.incr("verify_tokens", tags={"service": service,
"status": "error",
"context": "{}. {}".format(type(err), str(err))})
return
if response_data.get('status') == 'error':
metrics.incr("verify_tokens", tags={"service": service,
"status": "error",
"context": "status returned error"})
return
return response_data.get('payload')
class AuthTest(tornado.web.RequestHandler):
@auth('get', client=mock_apigateway)
async def get(self, *args, **kwargs):
""" kwargs = {"auth_data": "...", "path": "protected"} """
self.finish(kwargs)
return tornado.web.Application([
(r'/(?P<path>protected)', AuthTest),
])
@mock.patch("bf_auth.subject.build_endpoint")
def test_successful_authentication(self, mock_endpoint):
"""Test a cookie is acquired, and the tokens are extracted and verified."""
mock_cookie = 'mycookiename=id_token=1,access_token=2,refresh_token=3;'
mock_endpoint.return_value = 'https://example-api.com'
mock_apigateway.post.side_effect = mock_verify_tokens_success
response = self.fetch('/protected', headers={"Cookie": mock_cookie})
assert response.code == 200
# Python 3.5 doesn't guarantee dictionary ordering, so we use json sort_keys
actual = json.dumps(json.loads(response.body.decode()), sort_keys=True)
expected = '{"auth_data": "foobar", "path": "protected"}'
assert actual == expected
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment