Skip to content

Instantly share code, notes, and snippets.

@jhshengxy
Created January 22, 2020 02:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhshengxy/241b070490569530bae586e0a97179ee to your computer and use it in GitHub Desktop.
Save jhshengxy/241b070490569530bae586e0a97179ee to your computer and use it in GitHub Desktop.
import jwt
from django.contrib.auth import get_user_model
from django.utils.encoding import smart_text
from django.utils.translation import ugettext as _
from rest_framework import exceptions
from rest_framework.authentication import (
BaseAuthentication, get_authorization_header
)
from rest_framework_jwt.settings import api_settings
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
jwt_get_username_from_payload = api_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER
def _get_hash(user):
return user.password[-6:]
class BaseJSONWebTokenAuthentication(BaseAuthentication):
"""
Token based authentication using the JSON Web Token standard.
"""
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return (user, jwt_value)
def authenticate_credentials(self, payload):
"""
Returns an active user that matches the payload's user id and email.
"""
User = get_user_model()
username = jwt_get_username_from_payload(payload)
if not username:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
#user = User.objects.get_by_natural_key(username)
user = User.objects.get(username=username)
except User.DoesNotExist:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = _('User account is disabled.')
raise exceptions.AuthenticationFailed(msg)
if payload.get('pw_hash') != _get_hash(user):
msg = _('Password expired.')
raise exceptions.AuthenticationFailed(msg)
return user
class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
"""
Clients should authenticate by passing the token key in the "Authorization"
HTTP header, prepended with the string specified in the setting
`JWT_AUTH_HEADER_PREFIX`. For example:
Authorization: JWT eyJhbGciOiAiSFMyNTYiLCAidHlwIj
"""
www_authenticate_realm = 'api'
def get_jwt_value(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth:
if api_settings.JWT_AUTH_COOKIE:
return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
return None
if smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
return auth[1]
def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
return '{0} realm="{1}"'.format(api_settings.JWT_AUTH_HEADER_PREFIX, self.www_authenticate_realm)
# https://github.com/GetBlimp/django-rest-framework-jwt/issues/105#issuecomment-139311438
def jwt_custom_payload_handler(user):
from rest_framework_jwt.utils import jwt_payload_handler
payload = jwt_payload_handler(user)
payload['pw_hash'] = _get_hash(user)
return payload
# 在API中的调用
from rest_framework.views import APIView
class UserBalanceAPI(APIView):
authentication_classes = (JSONWebTokenAuthentication,)
def get(self, request):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment