Skip to content

Instantly share code, notes, and snippets.

@mikeywaites
Last active December 3, 2017 07:57
Show Gist options
  • Save mikeywaites/6fc0c68a49c63a5785db4155476d063d to your computer and use it in GitHub Desktop.
Save mikeywaites/6fc0c68a49c63a5785db4155476d063d to your computer and use it in GitHub Desktop.
import json
from flask import current_app, request, g
from werkzeug.wrappers import Response
from werkzeug.exceptions import HTTPException
from star_wars.models import User, Client
from .utils import decode_client_token, get_token_from_request
def check_auth(auth):
un = current_app.config['BASIC_AUTH_USERNAME']
pw = current_app.config['BASIC_AUTH_PASSWORD']
return auth.username == un and auth.password == pw
def basic_auth(endpoint):
auth = request.authorization
if not auth or not check_auth(auth):
resp = Response(
response=json.dumps({'message': 'Not Authorized', 'error': True}),
headers = {
'WWW-Authenticate': 'Basic realm="Login Required"'
},
mimetype='application/json',
status=401
)
raise HTTPException('Api Error', response=resp)
def get_api_client_from_request(endpoint):
"""Ensure a valid api_key query param was provided with the request and store the
client on `g.api_client`
"""
api_key = request.args.get('api_key', None)
if not api_key:
payload = {'message': 'Missing required ?api_key param'}
endpoint.return_error(422, payload=payload)
client = Client.query.filter_by(client_id=api_key).one_or_none()
if client is None:
payload = {'message': 'API Client not found'}
endpoint.return_error(404, payload=payload)
g.api_client = client
return client
def get_client_token(endpoint):
"""Validate the request has a valid JWT token. If the provided api_key and token
a valid the JWT will be decoded and stored on g.token for use later on.
"""
token = get_token_from_request()
if not token:
payload = {'message': 'Request does not contain token'}
endpoint.return_error(
401,
payload=payload
)
payload = decode_client_token(client, token)
if not payload:
payload = {'message': 'Invalid token'}
endpoint.return_error(
401,
payload=payload,
)
g.token = payload
def get_client_user_from_token(endpoint):
"""Using the provided sub field of a JWT, query the database to find the user by ID
and validate that the provided client has permissions to access this user.
"""
# The subject is matched against User.user_id and must be owned by the api_client.
user = User.query.filter_by(id=token['sub'], client_id=client.id).one_or_none()
if not user:
current_app.logger.debug('User not found')
endpoint.return_error(message='User not found.', status=404)
g.user = user
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment