Last active
December 3, 2017 07:57
-
-
Save mikeywaites/6fc0c68a49c63a5785db4155476d063d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from flask import current_app, request, g | |
from werkzeug.wrappers import Response | |
from werkzeug.exceptions import HTTPException | |
from star_wars.models import User, Client | |
from .utils import decode_client_token, get_token_from_request | |
def check_auth(auth): | |
un = current_app.config['BASIC_AUTH_USERNAME'] | |
pw = current_app.config['BASIC_AUTH_PASSWORD'] | |
return auth.username == un and auth.password == pw | |
def basic_auth(endpoint): | |
auth = request.authorization | |
if not auth or not check_auth(auth): | |
resp = Response( | |
response=json.dumps({'message': 'Not Authorized', 'error': True}), | |
headers = { | |
'WWW-Authenticate': 'Basic realm="Login Required"' | |
}, | |
mimetype='application/json', | |
status=401 | |
) | |
raise HTTPException('Api Error', response=resp) | |
def get_api_client_from_request(endpoint): | |
"""Ensure a valid api_key query param was provided with the request and store the | |
client on `g.api_client` | |
""" | |
api_key = request.args.get('api_key', None) | |
if not api_key: | |
payload = {'message': 'Missing required ?api_key param'} | |
endpoint.return_error(422, payload=payload) | |
client = Client.query.filter_by(client_id=api_key).one_or_none() | |
if client is None: | |
payload = {'message': 'API Client not found'} | |
endpoint.return_error(404, payload=payload) | |
g.api_client = client | |
return client | |
def get_client_token(endpoint): | |
"""Validate the request has a valid JWT token. If the provided api_key and token | |
a valid the JWT will be decoded and stored on g.token for use later on. | |
""" | |
token = get_token_from_request() | |
if not token: | |
payload = {'message': 'Request does not contain token'} | |
endpoint.return_error( | |
401, | |
payload=payload | |
) | |
payload = decode_client_token(client, token) | |
if not payload: | |
payload = {'message': 'Invalid token'} | |
endpoint.return_error( | |
401, | |
payload=payload, | |
) | |
g.token = payload | |
def get_client_user_from_token(endpoint): | |
"""Using the provided sub field of a JWT, query the database to find the user by ID | |
and validate that the provided client has permissions to access this user. | |
""" | |
# The subject is matched against User.user_id and must be owned by the api_client. | |
user = User.query.filter_by(id=token['sub'], client_id=client.id).one_or_none() | |
if not user: | |
current_app.logger.debug('User not found') | |
endpoint.return_error(message='User not found.', status=404) | |
g.user = user |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment