-
-
Save AliRn76/1fb99688315bedb2bf32fc4af0e50157 to your computer and use it in GitHub Desktop.
from django.contrib.auth.models import AnonymousUser | |
from rest_framework.authtoken.models import Token | |
from channels.db import database_sync_to_async | |
from channels.middleware import BaseMiddleware | |
from project.settings import SIMPLE_JWT, SECRET_KEY | |
@database_sync_to_async | |
def get_user(token_key): | |
# If you are using normal token based authentication | |
try: | |
token = Token.objects.get(key=token_key) | |
return token.user | |
except Token.DoesNotExist: | |
return AnonymousUser() | |
# If you are using jwt | |
try: | |
user_id: int = jwt.decode(token_key, SECRET_KEY, algorithms=[SIMPLE_JWT['ALGORITHM']]).get(SIMPLE_JWT['USER_ID_CLAIM']) | |
except jwt.exceptions.DecodeError: | |
return AnonymousUser() | |
except jwt.exceptions.ExpiredSignatureError: | |
return AnonymousUser() | |
try: | |
return AnonymousUser() if user_id is None else User.objects.get(id=user_id) | |
except User.DoesNotExist: | |
return AnonymousUser() | |
class TokenAuthMiddleware(BaseMiddleware): | |
def __init__(self, inner): | |
super().__init__(inner) | |
async def __call__(self, scope, receive, send): | |
try: | |
token_key = (dict((x.split('=') for x in scope['query_string'].decode().split("&")))).get('token', None) | |
except ValueError: | |
token_key = None | |
scope['user'] = AnonymousUser() if token_key is None else await get_user(token_key) | |
return await super().__call__(scope, receive, send) |
front:
new WebSocket(wss://${window.location.host}/ws?token=${localStorage.getItem('token')})
from django.contrib.auth.models import AnonymousUser from channels.db import database_sync_to_async from rest_framework.authtoken.models import Token from channels.middleware import BaseMiddleware @database_sync_to_async def get_user(token_key): try: token = Token.objects.get(key=token_key) return token.user except Token.DoesNotExist: return AnonymousUser() class TokenAuthMiddleware(BaseMiddleware): def __init__(self, inner): self.inner = inner async def __call__(self, scope, receive, send): token_key = scope['query_string'].decode().split('=')[-1] scope['user'] = await get_user(token_key) return await super().__call__(scope, receive, send)
Hello, I'm hopingto implement this. There is a .decode() for the token. How was it encoded? or tokens in general have to be decoded like that to be used?
I found out the query string returns a byte string is why. Ignore the question.. Thank you
I sent the token in the second argument to the ws constructor ( used to send the 'sec-websocket-protocol' header), in order to avoid sending the token in the query string, since it ends up in clear in the server logs.
frontend (React.js):
socket: new ReconnectingWebSocket('wss://' + window.location.host + '/ws', [localStorage.getItem('Token')])
backend:
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.middleware import BaseMiddleware
@database_sync_to_async
def get_user(token_key):
try:
token = Token.objects.get(key=token_key)
return token.user
except Token.DoesNotExist:
return AnonymousUser()
class TokenAuthMiddleware(BaseMiddleware):
def __init__(self, inner):
super().__init__(inner)
async def __call__(self, scope, receive, send):
try:
token_key = dict(scope['headers'])[b'sec-websocket-protocol'].decode('utf-8')
except ValueError:
token_key = None
scope['user'] = AnonymousUser() if token_key is None else await get_user(token_key)
return await super().__call__(scope, receive, send)
Hi there! You are welcome to discussion about BaseAuthTokenMiddleware. That middleware provides base logic to use auth tokens.
Hi there! I add another jwt authentication base on token for django channel with header and query_params just click here
front:
new WebSocket(ws://8000/{your_path}?token=${localStorage.getItem('token')})