Skip to content

Instantly share code, notes, and snippets.

@ahopkins
Created May 13, 2019 08:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahopkins/912e1cf48fa322355f78522328a541b3 to your computer and use it in GitHub Desktop.
Save ahopkins/912e1cf48fa322355f78522328a541b3 to your computer and use it in GitHub Desktop.
custom_authentication_cls_complex.py
import random
from datetime import datetime
from jwt.exceptions import DecodeError
from sanic import Sanic
from sanic.response import json
from sanic_jwt import (
Authentication,
Claim,
Configuration,
Initialize,
Responses,
exceptions,
)
class User:
def __init__(self, _id, username):
self.user_id = _id
self.username = username
self._permakey = None
self._last_login = None
def __repr__(self):
return "User(id='{}')".format(self.user_id)
def to_dict(self):
return {
"user_id": self.user_id,
"username": self.username,
"last_login": self.last_login,
}
@property
def permakey(self):
return self._permakey
@permakey.setter
def permakey(self, value):
self._permakey = value
@property
def last_login(self):
return self._last_login
@last_login.setter
def last_login(self, value):
self._last_login = value
USERS = []
username_table = {u.username: u for u in USERS}
userid_table = {u.user_id: u for u in USERS}
class MyConfig(Configuration):
def get_verify_exp(self, request):
"""
If the request is with the "permakey", then we do not want to check for expiration
"""
return not "permakey" in request.headers
class MyAuthentication(Authentication):
def _verify(
self,
request,
return_payload=False,
verify=True,
raise_missing=False,
request_args=None,
request_kwargs=None,
*args,
**kwargs
):
"""
If there is a "permakey", then we will verify the token by checking the
database. Otherwise, just do the normal verification.
Typically, any method that begins with an underscore in sanic-jwt should
not be touched. In this case, we are trying to break the rules a bit to handle
a unique use case: handle both expirable and non-expirable tokens.
"""
if "permakey" in request.headers:
# Extract the permakey from the headers
permakey = request.headers.get("permakey")
try:
payload = self._decode(permakey, verify=verify)
except DecodeError:
if return_payload:
return {}
return False, 400, "Invalid token format"
# Sometimes, the application will call _verify(...return_payload=True)
# So, let's make sure to handle this scenario.
if return_payload:
return payload
request_kwargs = request_kwargs or {}
user = request_kwargs.get("user")
# If wer cannot find a user, then this method should return
# is_valid == False
# reason == some text for why
# status == some status code, probably a 401
if not user:
is_valid = False
reason = "No user found"
status = 401
else:
# After finding a user, make sure the permakey matches,
# or else return a bad status or some other error.
# In production, both this scenario, and the above "No user found"
# scenario should return an identical message and status code.
# This is to prevent your application accidentally
# leaking information about the existence or non-existence of users.
is_valid = user.permakey == permakey
reason = None if is_valid else "Permakey mismatch"
status = 200 if is_valid else 401
return is_valid, status, reason
else:
return super()._verify(
request=request,
return_payload=return_payload,
verify=verify,
raise_missing=raise_missing,
request_args=request_args,
request_kwargs=request_kwargs,
*args,
**kwargs
)
async def authenticate(self, request, *args, **kwargs):
username = request.json.get("username", None)
password = request.json.get("password", None)
if not username or not password:
raise exceptions.AuthenticationFailed(
"Missing username or password."
)
# Here, you would want to try to verify the username and password.
# In this example, we are simply creating a new user if it does not
# exist, and then authenticating the new user.
user = username_table.get(username)
if not user:
user = User(len(USERS) + 1, username)
USERS.append(user)
username_table.update({user.username: user})
userid_table.update({user.user_id: user})
user.permakey = await self.generate_access_token(user)
user.last_login = datetime.utcnow().strftime("%c")
return user
async def retrieve_user(self, request, payload, *args, **kwargs):
if payload:
user_id = payload.get("user_id", None)
return userid_table.get(user_id)
else:
return None
def my_payload_extender(payload, *args, **kwargs):
user_id = payload.get("user_id", None)
user = userid_table.get(user_id)
payload.update({"username": user.username})
return payload
class MyResponses(Responses):
@staticmethod
def extend_authenticate(
request, user=None, access_token=None, refresh_token=None
):
return {"permakey": user.permakey}
class RandomClaim(Claim):
"""
This custom claim is not necessary. It is merely being added so that everytime
that await Authentication.generate_access_token() is being called, it will
provide a different token. It is for illustrative purposes only
"""
key = "rand"
def setup(self, *args, **kwargs):
return random.random()
def verify(self, *args, **kwargs):
return True
# elsewhere in the universe ...
if __name__ == "__main__":
app = Sanic(__name__)
sanicjwt = Initialize(
app,
authentication_class=MyAuthentication,
configuration_class=MyConfig,
# Following settings are for example purposes only
responses_class=MyResponses,
custom_claims=[RandomClaim],
extend_payload=my_payload_extender,
expiration_delta=15,
leeway=0,
)
@app.route("/")
async def helloworld(request):
return json({"hello": "world"})
@app.route("/protected")
@sanicjwt.inject_user()
@sanicjwt.protected()
async def protected_request(request, user):
return json({"protected": True})
# this route is for demonstration only
@app.route("/cache")
@sanicjwt.protected()
async def protected_cache(request):
print(USERS)
return json(userid_table)
app.run(debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment