Skip to content

Instantly share code, notes, and snippets.

@rluts
Last active October 13, 2023 20:56
Show Gist options
  • Save rluts/22e05ed8f53f97bdd02eafdf38f3d60a to your computer and use it in GitHub Desktop.
Save rluts/22e05ed8f53f97bdd02eafdf38f3d60a to your computer and use it in GitHub Desktop.
Token authorization middleware for Django Channels 2
from channels.auth import AuthMiddlewareStack
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from django.db import close_old_connections
class TokenAuthMiddleware:
"""
Token authorization middleware for Django Channels 2
"""
def __init__(self, inner):
self.inner = inner
def __call__(self, scope):
headers = dict(scope['headers'])
if b'authorization' in headers:
try:
token_name, token_key = headers[b'authorization'].decode().split()
if token_name == 'Token':
token = Token.objects.get(key=token_key)
scope['user'] = token.user
close_old_connections()
except Token.DoesNotExist:
scope['user'] = AnonymousUser()
return self.inner(scope)
TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddleware(AuthMiddlewareStack(inner))
@agusmakmun
Copy link

To support headers Authorization and token from query string, and also session as well.

from urllib import parse

from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.auth import AuthMiddlewareStack


@database_sync_to_async
def get_user_from_headers_or_queries(scope):
    """
    function to get the `User` object
    from his headers or queries as well.
    :return object of `User` or None
    """
    try:
        headers = dict(scope["headers"])
    except KeyError as error:
        headers = {}
        logger.error(error)

    try:
        params = dict(parse.parse_qsl(scope["query_string"].decode("utf8")))
    except KeyError as error:
        params = {}
        logger.warning(error)

    token_key = None
    token_is_found = False

    if b"authorization" in headers:
        # 1. get from authorization headers
        token_name, token_key = headers[b"authorization"].decode().split()
        if token_name == "Token":  # nosec: B105 (just checking the token name)
            token_is_found = True
    else:
        # 2. get from token params
        token_key = params.get("token")
        token_is_found = True if token_key else False

    if token_is_found:
        try:
            token = Token.objects.get(key=token_key)
            return token.user
        except Token.DoesNotExist:
            pass  # AnonymousUser
    return None


class TokenAuthMiddleware:

    def __init__(self, app):
        # Store the ASGI application we were passed
        self.app = app

    async def __call__(self, scope, receive, send):
        user = await get_user_from_headers_or_queries(scope)
        if user is not None:
            scope["user"] = user
        return await self.app(scope, receive, send)


# Handy shortcut for applying all three layers at once
def TokenAuthMiddlewareStack(inner):
    """
    middleware to support websocket ssh connection
    from both session or by queries
    """
    return TokenAuthMiddleware(AuthMiddlewareStack(inner))

urls.py;

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from yourproject.utils.middleware import TokenAuthMiddlewareStack

application = ProtocolTypeRouter({
    'websocket': AllowedHostsOriginValidator(
        TokenAuthMiddlewareStack(
            URLRouter(...)
        )
    )
})

@ritiksoni00
Copy link

To support headers Authorization and token from query string, and also session as well.

from urllib import parse

from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.auth import AuthMiddlewareStack


@database_sync_to_async
def get_user_from_headers_or_queries(scope):
    """
    function to get the `User` object
    from his headers or queries as well.
    :return object of `User` or None
    """
    try:
        headers = dict(scope["headers"])
    except KeyError as error:
        headers = {}
        logger.error(error)

    try:
        params = dict(parse.parse_qsl(scope["query_string"].decode("utf8")))
    except KeyError as error:
        params = {}
        logger.warning(error)

    token_key = None
    token_is_found = False

    if b"authorization" in headers:
        # 1. get from authorization headers
        token_name, token_key = headers[b"authorization"].decode().split()
        if token_name == "Token":  # nosec: B105 (just checking the token name)
            token_is_found = True
    else:
        # 2. get from token params
        token_key = params.get("token")
        token_is_found = True if token_key else False

    if token_is_found:
        try:
            token = Token.objects.get(key=token_key)
            return token.user
        except Token.DoesNotExist:
            pass  # AnonymousUser
    return None


class TokenAuthMiddleware:

    def __init__(self, app):
        # Store the ASGI application we were passed
        self.app = app

    async def __call__(self, scope, receive, send):
        user = await get_user_from_headers_or_queries(scope)
        if user is not None:
            scope["user"] = user
        return await self.app(scope, receive, send)


# Handy shortcut for applying all three layers at once
def TokenAuthMiddlewareStack(inner):
    """
    middleware to support websocket ssh connection
    from both session or by queries
    """
    return TokenAuthMiddleware(AuthMiddlewareStack(inner))

urls.py;

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from yourproject.utils.middleware import TokenAuthMiddlewareStack

application = ProtocolTypeRouter({
    'websocket': AllowedHostsOriginValidator(
        TokenAuthMiddlewareStack(
            URLRouter(...)
        )
    )
})

first you need to change the order.

# Handy shortcut for applying all three layers at once
def TokenAuthMiddlewareStack(inner):
    """
    middleware to support websocket ssh connection
    from both session or by queries
    """
    return AuthMiddlewareStack(TokenAuthMiddleware(inner)) #<---------- need to change the order. 

and second getting token from a query string is not a good idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment