Skip to content

Instantly share code, notes, and snippets.

@rluts
Last active October 13, 2023 20:56
Show Gist options
  • Star 55 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save rluts/22e05ed8f53f97bdd02eafdf38f3d60a to your computer and use it in GitHub Desktop.
Save rluts/22e05ed8f53f97bdd02eafdf38f3d60a to your computer and use it in GitHub Desktop.
Token authorization middleware for Django Channels 2
from channels.auth import AuthMiddlewareStack
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from django.db import close_old_connections
class TokenAuthMiddleware:
"""
Token authorization middleware for Django Channels 2
"""
def __init__(self, inner):
self.inner = inner
def __call__(self, scope):
headers = dict(scope['headers'])
if b'authorization' in headers:
try:
token_name, token_key = headers[b'authorization'].decode().split()
if token_name == 'Token':
token = Token.objects.get(key=token_key)
scope['user'] = token.user
close_old_connections()
except Token.DoesNotExist:
scope['user'] = AnonymousUser()
return self.inner(scope)
TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddleware(AuthMiddlewareStack(inner))
@rluts
Copy link
Author

rluts commented Jul 4, 2018

Hi, @biozz. Are you using JavaScript on client side?
This middleware must be use for Token authorization, not for basic. I used this method in mobile backends and it works fine

https://stackoverflow.com/questions/4361173/http-headers-in-websockets-client-api

There is no method in the JavaScript WebSockets API for specifying additional headers for the client/browser to send.

@opahopa
Copy link

opahopa commented Jul 30, 2018

Hi, @rluts i'm also wandering if you can explain how do you send the authorization headers for your middleware example?
i'm using js (angular 6) cli with django-rest / django-rest-jwt .

Following this topic: https://stackoverflow.com/questions/4361173/http-headers-in-websockets-client-api
It seems like in my case there is an issue with both options:

  1. sending a token via Sec-WebSocket-Protocol requires to specify the protocol in django channels settings, which is not possible because the token is dynamic.
  2. my api which is implementing websocket server is on a different domain -> can't pass the token via cookies.

@rluts
Copy link
Author

rluts commented Sep 5, 2018

Hi, @opahopa If you use websocket connection via js, you can sent token in get param.

  1. Use this middleware https://github.com/rluts/drf-messaging/blob/master/drf_messaging/token_auth.py
  2. Connect to websocket http://your_websocket/?token=XXX

@Ken4scholars
Copy link

Is anyone actually using this code? Still trying to figure out how to send the token in the auth header without using query strings.
@rluts, the link to SO you sent has no answer to that. Only if using basic authentication will the header be sent correctly

@mCii
Copy link

mCii commented Mar 16, 2019

Hi, @opahopa If you use websocket connection via js, you can sent token in get param.

  1. Use this middleware https://github.com/rluts/drf-messaging/blob/master/drf_messaging/token_auth.py
  2. Connect to websocket http://your_websocket/?token=XXX

Hi, it is not secure to pass the token in the query parameters. I wish I can suggest another solution for this, but it seems like this is the only way to go.
Maybe we can at least add some sort of encryption to the token and let the server know thow to decrypt the received token , but still not secure enough to me.

@ExTexan
Copy link

ExTexan commented Apr 17, 2019

@rluts, My IDE (Intellij) is complaining about line 28 of token_auth.py...

PEP 8: do not assign a lambda expression, use a def

I'm not sure how to turn that into a def. Can you offer a suggestion?

@PamilerinId
Copy link

@ExTexan,
Here you go:

def TokenAuthMiddlewareStack(inner):
    return TokenAuthMiddleware(AuthMiddlewareStack(inner))

@ExTexan
Copy link

ExTexan commented Apr 22, 2019

@Pamilerinld, thanks for that!

@ricksore
Copy link

Hi ! Just starting on Django Channels. Why can't we just inherit the AuthMiddlewareStack so we do not have to do the lambda ?

@Razz21
Copy link

Razz21 commented Jul 21, 2019

Hi, to send token via Websocket you can use cookie header:

document.cookie = 'authorization=' + authToken + ';' 

and then __call__ method would look like this:

def __call__(self, scope):
    headers = dict(scope["headers"])
    if "authorization" in headers[b"cookie"]:
        try:
            cookies = headers[b"cookie"].decode()
            token_key = re.search("authorization=(.*)(; )?", cookies).group(1)
            if token_key:
                token = Token.objects.get(key=token_key)
                scope["user"] = token.user
                close_old_connections()
        except Token.DoesNotExist:
            scope["user"] = AnonymousUser()

    return self.inner(scope)

@prokher
Copy link

prokher commented Aug 27, 2019

I would suggest adding more checks, practically just copy Django REST framework standard behavior. Something like the following:

from django.utils.translation import gettext_lazy as _
import rest_framework.authtoken.models
import rest_framework.exceptions

class TokenAuthMiddleware:
    """Token authorization middleware for Channels.

    Authenticate the connection by the header 'Authorization: Token...'
    using Django REST framework token-based authentication.
    """

    def __init__(self, inner):
        """Save given inner middleware to invoke in the `__call__`."""
        self._inner = inner

    def __call__(self, scope):
        """Add user to the scope by 'Authorization: Token...' header."""

        # This function carefully and creatively copied from Django REST
        # framework implementation `TokenAuthentication` class.

        # Only handle "Authorization" headers starting with "Token".
        headers = dict(scope["headers"])
        if b"authorization" not in headers:
            return self._inner(scope)
        auth_header = headers[b"authorization"].split()
        if not auth_header or auth_header[0].lower() != "token".encode():
            return self._inner(scope)

        # Check header correctness. Since we use Django REST framework
        # for token-based authentication, we raise its exceptions.
        AuthError = rest_framework.exceptions.AuthenticationFailed
        if len(auth_header) == 1:
            raise AuthError(_("Invalid token header: no credentials provided!"))
        if len(auth_header) > 2:
            raise AuthError(_("Invalid token header: token string contains spaces!"))
        try:
            auth_header_token = auth_header[1].decode()
        except UnicodeError:
            raise AuthError(_("Invalid token header: token contains invalid symbols!"))

        # According to the warning in the Channels authentication docs
        # we have to manually close old database connections to prevent
        # usage of timed out connections.
        django.db.close_old_connections()

        # Find a user by the token.
        Token = rest_framework.authtoken.models.Token
        try:
            token = Token.objects.select_related("user").get(key=auth_header_token)
        except Token.DoesNotExist:
            raise AuthError(_("Invalid token!"))
        if not token.user.is_active:
            raise AuthError(_("User is inactive!"))

        # Call inner middleware with a user in the scope.
        return self._inner(dict(scope, user=token.user))

@Kannanravindran
Copy link

Kannanravindran commented Feb 4, 2020

If any of you ended up here and using Django 3.0. This won't work out of the box. The db access needs to be used in a separate method with database_sync_to_async decorator.

Here is the code snippet.

# myproject.myapi.utils.py
from channels.auth import AuthMiddlewareStack
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser

from rest_framework.authtoken.models import Token


@database_sync_to_async
def get_user(headers):
    try:
        token_name, token_key = headers[b'authorization'].decode().split()
        if token_name == 'Token':
            token = Token.objects.get(key=token_key)
            return token.user
    except Token.DoesNotExist:
        return AnonymousUser()


class TokenAuthMiddleware:

    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        return TokenAuthMiddlewareInstance(scope, self)


class TokenAuthMiddlewareInstance:
    """
    Yeah, this is black magic:
    https://github.com/django/channels/issues/1399
    """
    def __init__(self, scope, middleware):
        self.middleware = middleware
        self.scope = dict(scope)
        self.inner = self.middleware.inner

    async def __call__(self, receive, send):
        headers = dict(self.scope['headers'])
        if b'authorization' in headers:
            self.scope['user'] = await get_user(headers)
        inner = self.inner(self.scope)
        return await inner(receive, send)


TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddleware(AuthMiddlewareStack(inner))

credits: stackoverflow

@astrikov-d
Copy link

astrikov-d commented Feb 12, 2020

Thanks for snippet, but i do not understand how you would use it, since it's not possible to send Authorization header along websocket connection. IMO, most suitable way is to pass token via query params (possibly encoded with some solt/ticket) and then catch it inside consumer:

class WebsocketConsumerWithUser(WebsocketConsumer):
    group_name = 'test'

    def get_user(self):
        user = self.scope['user']

        if str(user) == 'AnonymousUser':
            params = parse_qs(self.scope['query_string'].decode('utf8'))
            token = params.get('token', (None,))[0]
            token = Token.objects.get(key=token)
            return token.user

    def connect(self):
        user = self.get_user():
           if not user:
                self.close(code=403)
                return

        async_to_sync(self.channel_layer.group_add)(self.group_name, self.channel_name)
        self.accept()

It's more or less secure way, if you are using https/wss protocols. The only concern that I see is that if your webserver access logs will be compromised, your tokens will be exposed to third-party.

@Kannanravindran
Copy link

@astrikov-d As mentioned in earlier comments, sending the token and possibly the email address/username via query_params is not the best way. Instead, you can send it through sec-websocket-protocol. Here is a sample script which does that.

var endpoint  = "wss://api.myawesomesite.com/ws/<channel>/"
var socket = new WebSocket(endpoint, ["Authorization", "Token:82kasnciasdi...93hncasir"])

you would receive the value in a list from which you can extract the token by modifying the get_user method.

@astrikov-d
Copy link

astrikov-d commented Feb 13, 2020

@Kannanravindran, thanks, as I understand second argument will be passed as value of Sec-WebSocket-Protocol header. According to WS specification, this header should be used to define format of data, for example SOAP, WAMP etc. I don't think that passing some custom values like Authorization, Token etc. as values of this header is a good idea. This looks like workaround. So far I do not see any security concerns to pass Token via query params, if app uses SSL.

@Kannanravindran
Copy link

@astrikov-d I was thinking that would be much easier too. But as mentioned in a few comments above, it's not secure to send auth/session token via query params since they are visible on server logs(i guess). Well, there is the third option of using cookies. take a look at this

@noobmaster19
Copy link

noobmaster19 commented Apr 24, 2020

Hello , sorry for the interruption , i have successfully implemented the cookie method raised here.
Im pretty new to web development , wanted to know if this method is secure.

@astrikov-d I was thinking that would be much easier too. But as mentioned in a few comments above, it's not secure to send auth/session token via query params since they are visible on server logs(i guess). Well, there is the third option of using cookies. take a look at this

@Kannanravindran
Copy link

@neowenshun if the webpage you are using it on is SSL secured and the cookie has HTTPOnly flag set, it should be secure implementation.

@dphans
Copy link

dphans commented Sep 2, 2020

I still use this way to use token as query param, authentication with django-rest-framework-simplejwt:

from urllib import parse

from channels.auth import AuthMiddlewareStack
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
# noinspection PyProtectedMember
from django.db import close_old_connections
from rest_framework_simplejwt.settings import api_settings
from rest_framework_simplejwt.state import User
from rest_framework_simplejwt.tokens import AccessToken

from Account.models import User


@database_sync_to_async
def get_user(**kwargs):
    try:
        return User.objects.get(**kwargs)
    except User.DoesNotExist:
        return AnonymousUser()


class JwtAuthMiddleware:

    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        return JwtAuthMiddlewareInstance(scope, self)


class JwtAuthMiddlewareInstance:

    def __init__(self, scope, middleware):
        self.middleware = middleware
        self.scope = dict(scope)
        self.inner = self.middleware.inner

    async def __call__(self, receive, send):
        close_old_connections()

        if self.scope.get('user') and self.scope.get('user').is_active:
            inner = self.inner(dict(self.scope, user=self.scope.get('user')))
            return await inner(receive, send)

        query_string = self.scope["query_string"]
        if not query_string:
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        try:
            query_dict = parse.parse_qs(query_string.decode('utf-8'))
        except:
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        if type(query_dict.get('token')) is not list or not len(query_dict.get('token')):
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        raw_token = query_dict['token'][0]
        if not raw_token:
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        try:
            token_decoded = AccessToken(raw_token)
        except:
            token_decoded = None

        if not token_decoded:
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        user = await self.get_user(validated_token=token_decoded, )
        inner = self.inner(dict(self.scope, user=user))
        return await inner(receive, send)

    async def get_user(self, validated_token):
        try:
            user_id = validated_token[api_settings.USER_ID_CLAIM]
        except Exception:
            return AnonymousUser()

        try:
            user = await get_user(**{api_settings.USER_ID_FIELD: user_id})
        except:
            return AnonymousUser()

        if not user.is_active:
            return AnonymousUser()

        return user


JwtAuthMiddlewareStack = lambda inner: JwtAuthMiddleware(AuthMiddlewareStack(inner))

@mwibutsa
Copy link

Hello thank you all for your awesome answers. sadly, I am using django-all-auth to use social media authentication, and it only works with djangorestframework-jwt . I also want to use it with Django channels but I can't figure out how to create that custom authentication middleware. if anyone has faced the same issue, I would like to know how they handled it.

@dmwyatt
Copy link

dmwyatt commented Sep 28, 2020

I created a middleware that uses the websocket subprotocol. Rather than rolling my own authentication, I subclassed and customized JSONWebTokenAuthentication from the django-rest-framework-jwt package to get the JWT from the scope rather than from the HTTP request headers.

https://gist.github.com/dmwyatt/5cf7e5102ed0a01b7d38aabf322e03b2

@debdutgoswami
Copy link

I still use this way to use token as query param, authentication with django-rest-framework-simplejwt:

from urllib import parse

from channels.auth import AuthMiddlewareStack
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
# noinspection PyProtectedMember
from django.db import close_old_connections
from rest_framework_simplejwt.settings import api_settings
from rest_framework_simplejwt.state import User
from rest_framework_simplejwt.tokens import AccessToken

from Account.models import User


@database_sync_to_async
def get_user(**kwargs):
    try:
        return User.objects.get(**kwargs)
    except User.DoesNotExist:
        return AnonymousUser()


class JwtAuthMiddleware:

    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        return JwtAuthMiddlewareInstance(scope, self)


class JwtAuthMiddlewareInstance:

    def __init__(self, scope, middleware):
        self.middleware = middleware
        self.scope = dict(scope)
        self.inner = self.middleware.inner

    async def __call__(self, receive, send):
        close_old_connections()

        if self.scope.get('user') and self.scope.get('user').is_active:
            inner = self.inner(dict(self.scope, user=self.scope.get('user')))
            return await inner(receive, send)

        query_string = self.scope["query_string"]
        if not query_string:
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        try:
            query_dict = parse.parse_qs(query_string.decode('utf-8'))
        except:
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        if type(query_dict.get('token')) is not list or not len(query_dict.get('token')):
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        raw_token = query_dict['token'][0]
        if not raw_token:
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        try:
            token_decoded = AccessToken(raw_token)
        except:
            token_decoded = None

        if not token_decoded:
            inner = self.inner(dict(self.scope, user=AnonymousUser()))
            return await inner(receive, send)

        user = await self.get_user(validated_token=token_decoded, )
        inner = self.inner(dict(self.scope, user=user))
        return await inner(receive, send)

    async def get_user(self, validated_token):
        try:
            user_id = validated_token[api_settings.USER_ID_CLAIM]
        except Exception:
            return AnonymousUser()

        try:
            user = await get_user(**{api_settings.USER_ID_FIELD: user_id})
        except:
            return AnonymousUser()

        if not user.is_active:
            return AnonymousUser()

        return user


JwtAuthMiddlewareStack = lambda inner: JwtAuthMiddleware(AuthMiddlewareStack(inner))

I am absolutely new to django-channels and I am using this code, it is returning an error.

    inner = self.inner(dict(self.scope, user=AnonymousUser()))
TypeError: __call__() missing 2 required positional arguments: 'receive' and 'send'

Can anyone point to the right direction?

@ajaykarthikr
Copy link

Ever since I upgraded to Channels 3.0 this code snippet isn't working anymore. Can anyone help?

If any of you ended up here and using Django 3.0. This won't work out of the box. The db access needs to be used in a separate method with database_sync_to_async decorator.

Here is the code snippet.

# myproject.myapi.utils.py
from channels.auth import AuthMiddlewareStack
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser

from rest_framework.authtoken.models import Token


@database_sync_to_async
def get_user(headers):
    try:
        token_name, token_key = headers[b'authorization'].decode().split()
        if token_name == 'Token':
            token = Token.objects.get(key=token_key)
            return token.user
    except Token.DoesNotExist:
        return AnonymousUser()


class TokenAuthMiddleware:

    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        return TokenAuthMiddlewareInstance(scope, self)


class TokenAuthMiddlewareInstance:
    """
    Yeah, this is black magic:
    https://github.com/django/channels/issues/1399
    """
    def __init__(self, scope, middleware):
        self.middleware = middleware
        self.scope = dict(scope)
        self.inner = self.middleware.inner

    async def __call__(self, receive, send):
        headers = dict(self.scope['headers'])
        if b'authorization' in headers:
            self.scope['user'] = await get_user(headers)
        inner = self.inner(self.scope)
        return await inner(receive, send)


TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddleware(AuthMiddlewareStack(inner))

credits: stackoverflow

@AliRn76
Copy link

AliRn76 commented Dec 24, 2020

@ajaykarthikr
If you guys using Channels 3 you can use this snippet code:
(Token authorization middleware for Django Channels 3)

https://gist.github.com/AliRn76/1fb99688315bedb2bf32fc4af0e50157

@goatwu1993
Copy link

Thanks for sharing. Below is simplejwt code that works for me.

https://gist.github.com/goatwu1993/1105108e71b6a138168a2e9d160b357d

django==3.1.4
djangorestframework_simplejwt==4.6.0
channels==3.0.3

Also I replace lambda with def to avoid flake8 check.

@nasir733
Copy link

nasir733 commented Mar 22, 2021

this is the code that worked for me in jwt auth in channels 3

"""General web socket middlewares
"""

from channels.db import database_sync_to_async
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from rest_framework_simplejwt.tokens import UntypedToken
from rest_framework_simplejwt.authentication import JWTTokenUserAuthentication
from rest_framework_simplejwt.state import User
from channels.middleware import BaseMiddleware
from channels.auth import AuthMiddlewareStack
from django.db import close_old_connections
from urllib.parse import parse_qs
from jwt import decode as jwt_decode
from django.conf import settings
@database_sync_to_async
def get_user(validated_token):
    try:
        user = get_user_model().objects.get(id=validated_token["user_id"])
        # return get_user_model().objects.get(id=toke_id)
        print(f"{user}")
        return user
   
    except User.DoesNotExist:
        return AnonymousUser()



class JwtAuthMiddleware(BaseMiddleware):
    def __init__(self, inner):
        self.inner = inner

    async def __call__(self, scope, receive, send):
       # Close old database connections to prevent usage of timed out connections
        close_old_connections()

        # Get the token
        token = parse_qs(scope["query_string"].decode("utf8"))["token"][0]

        # Try to authenticate the user
        try:
            # This will automatically validate the token and raise an error if token is invalid
            UntypedToken(token)
        except (InvalidToken, TokenError) as e:
            # Token is invalid
            print(e)
            return None
        else:
            #  Then token is valid, decode it
            decoded_data = jwt_decode(token, settings.SECRET_KEY, algorithms=["HS256"])
            print(decoded_data)
            # Will return a dictionary like -
            # {
            #     "token_type": "access",
            #     "exp": 1568770772,
            #     "jti": "5c15e80d65b04c20ad34d77b6703251b",
            #     "user_id": 6
            # }

            # Get the user using ID
            scope["user"] = await get_user(validated_token=decoded_data)
        return await super().__call__(scope, receive, send)


def JwtAuthMiddlewareStack(inner):
    return JwtAuthMiddleware(AuthMiddlewareStack(inner))


cheers I didn't remove the useless imports I just found the solution and sharing it with you

@alex-pobeditel-2004
Copy link

If anyone else needs it, here is a version of @dmwyatt gist in which Channels 3.0 are supported with rest_framework_simplejwt:
https://gist.github.com/alex-pobeditel-2004/5098bac720c4eeb79052b7234346f52d

@manupatel007
Copy link

For those who are stuck with client side code to be written and how this middleware connects to the url configuration, following blog can help --> https://hashnode.com/post/using-django-drf-jwt-authentication-with-django-channels-cjzy5ffqs0013rus1yb9huxvl

@agusmakmun
Copy link

To support headers Authorization and token from query string, and also session as well.

from urllib import parse

from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.auth import AuthMiddlewareStack


@database_sync_to_async
def get_user_from_headers_or_queries(scope):
    """
    function to get the `User` object
    from his headers or queries as well.
    :return object of `User` or None
    """
    try:
        headers = dict(scope["headers"])
    except KeyError as error:
        headers = {}
        logger.error(error)

    try:
        params = dict(parse.parse_qsl(scope["query_string"].decode("utf8")))
    except KeyError as error:
        params = {}
        logger.warning(error)

    token_key = None
    token_is_found = False

    if b"authorization" in headers:
        # 1. get from authorization headers
        token_name, token_key = headers[b"authorization"].decode().split()
        if token_name == "Token":  # nosec: B105 (just checking the token name)
            token_is_found = True
    else:
        # 2. get from token params
        token_key = params.get("token")
        token_is_found = True if token_key else False

    if token_is_found:
        try:
            token = Token.objects.get(key=token_key)
            return token.user
        except Token.DoesNotExist:
            pass  # AnonymousUser
    return None


class TokenAuthMiddleware:

    def __init__(self, app):
        # Store the ASGI application we were passed
        self.app = app

    async def __call__(self, scope, receive, send):
        user = await get_user_from_headers_or_queries(scope)
        if user is not None:
            scope["user"] = user
        return await self.app(scope, receive, send)


# Handy shortcut for applying all three layers at once
def TokenAuthMiddlewareStack(inner):
    """
    middleware to support websocket ssh connection
    from both session or by queries
    """
    return TokenAuthMiddleware(AuthMiddlewareStack(inner))

urls.py;

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from yourproject.utils.middleware import TokenAuthMiddlewareStack

application = ProtocolTypeRouter({
    'websocket': AllowedHostsOriginValidator(
        TokenAuthMiddlewareStack(
            URLRouter(...)
        )
    )
})

@ritiksoni00
Copy link

To support headers Authorization and token from query string, and also session as well.

from urllib import parse

from rest_framework.authtoken.models import Token
from channels.db import database_sync_to_async
from channels.auth import AuthMiddlewareStack


@database_sync_to_async
def get_user_from_headers_or_queries(scope):
    """
    function to get the `User` object
    from his headers or queries as well.
    :return object of `User` or None
    """
    try:
        headers = dict(scope["headers"])
    except KeyError as error:
        headers = {}
        logger.error(error)

    try:
        params = dict(parse.parse_qsl(scope["query_string"].decode("utf8")))
    except KeyError as error:
        params = {}
        logger.warning(error)

    token_key = None
    token_is_found = False

    if b"authorization" in headers:
        # 1. get from authorization headers
        token_name, token_key = headers[b"authorization"].decode().split()
        if token_name == "Token":  # nosec: B105 (just checking the token name)
            token_is_found = True
    else:
        # 2. get from token params
        token_key = params.get("token")
        token_is_found = True if token_key else False

    if token_is_found:
        try:
            token = Token.objects.get(key=token_key)
            return token.user
        except Token.DoesNotExist:
            pass  # AnonymousUser
    return None


class TokenAuthMiddleware:

    def __init__(self, app):
        # Store the ASGI application we were passed
        self.app = app

    async def __call__(self, scope, receive, send):
        user = await get_user_from_headers_or_queries(scope)
        if user is not None:
            scope["user"] = user
        return await self.app(scope, receive, send)


# Handy shortcut for applying all three layers at once
def TokenAuthMiddlewareStack(inner):
    """
    middleware to support websocket ssh connection
    from both session or by queries
    """
    return TokenAuthMiddleware(AuthMiddlewareStack(inner))

urls.py;

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from yourproject.utils.middleware import TokenAuthMiddlewareStack

application = ProtocolTypeRouter({
    'websocket': AllowedHostsOriginValidator(
        TokenAuthMiddlewareStack(
            URLRouter(...)
        )
    )
})

first you need to change the order.

# Handy shortcut for applying all three layers at once
def TokenAuthMiddlewareStack(inner):
    """
    middleware to support websocket ssh connection
    from both session or by queries
    """
    return AuthMiddlewareStack(TokenAuthMiddleware(inner)) #<---------- need to change the order. 

and second getting token from a query string is not a good idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment