Skip to content

Instantly share code, notes, and snippets.

@roymanigley
Last active December 25, 2023 13:06
Show Gist options
  • Save roymanigley/d60771804d6ff6de0c3ac49eaf64f4fa to your computer and use it in GitHub Desktop.
Save roymanigley/d60771804d6ff6de0c3ac49eaf64f4fa to your computer and use it in GitHub Desktop.
Django Rest Framework Simple JWT setting tokens in cookie

Django Rest Framework Simple JWT

Installation

django-admin startproject simple_jwt
cd simple_jwt

python -m venv .env
source .env/bin/activate

pip install django djangorestframework djangorestframework-simplejwt
./manage.py startapp simple_jwt_app

Configuration

simple_jwt/settings.py

INSTALLED_APPS = [
    ...
    'rest_framework_simplejwt',
    'rest_framework_simplejwt.token_blacklist',
    'rest_framework',
    'simple_jwt_app'
    ...
]

REST_FRAMEWORK = {
    'DEFAULT_RENDERER_CLASSES': [
        'rest_framework.renderers.JSONRenderer',
        'rest_framework.renderers.BrowsableAPIRenderer',
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.DjangoModelPermissions',
    ],
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'rest_framework_simplejwt.authentication.JWTAuthentication',
    ),
}

MIDDLEWARE = [
    ...
    'simple_jwt.middleware.AuthorizationMiddleware',
    ...
]

simple_jwt/urls.py

from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView
from rest_framework.routers import DefaultRouter
from app.views import PersonView

urlpatterns = [
    ...
    path('api/token/', TokenObtainPairView.as_view(), name='token_obtain_pair'),
    path('api/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
    ...
]

router = DefaultRouter()
router.register('api/person', PersonView, basename='Person')

urlpatterns += router.urls

Custom implementation

simple_jwt_app/models.py

from django.db import models

class Person(models.Model):
    name = models.CharField(null=False, max_length=255)

simple_jwt_app/views.py

from rest_framework import serializers
from rest_framework.viewsets import ModelViewSet
from .models import Person
from rest_framework.permissions import DjangoModelPermissions, BasePermission
from rest_framework import exceptions

class PersonSerializer(serializers.ModelSerializer):

    class Meta:
        model = Person
        fields = '__all__'


class PersonView(ModelViewSet):
    serializer_class = PersonSerializer
    queryset = Person.objects.all()

simple_jwt/middleware.py

import base64
import json
from datetime import datetime
from rest_framework_simplejwt.serializers import TokenRefreshSerializer, TokenObtainSerializer 
from rest_framework_simplejwt.tokens import RefreshToken
from django.contrib.auth.models import User
from rest_framework_simplejwt.token_blacklist.models import BlacklistedToken, OutstandingToken
from django.http.response import HttpResponseForbidden

REFRESH_TOKEN_KEY = 'refresh_token'
ACCESS_TOKEN_KEY = 'access_token'

EXCLUDE_FROM_MIDDLEWARE = [
    'Django_Rest_Framework.views.apiIndexView',
    'Django_Rest_Framework.views.IndexView',
    'accounts.views.RegisterView',
    'accounts.views.LoginView',
]

class AuthorizationMiddleware:
    def __init__(self, get_response=None):
        self.get_response = get_response

    def process_view(self, request, view_func, view_args, view_kwargs):
        view_name = '.'.join((view_func.__module__, view_func.__name__))
        if view_name in EXCLUDE_FROM_MIDDLEWARE:
            return None

    def __call__(self, request):
        access_token = request.COOKIES.get(ACCESS_TOKEN_KEY)
        refresh_token = request.COOKIES.get(REFRESH_TOKEN_KEY)
        if access_token:
            if self.is_token_expired(access_token):
                ser = TokenRefreshSerializer(data={'refresh': refresh_token})
                try:
                    ser.is_valid(raise_exception=True)
                except Exception as e:
                    return HttpResponseForbidden(json.dumps({'detail': e.__str__()}))

                access_token, refresh_token = self.renew_tokens(access_token)
            request.META['HTTP_AUTHORIZATION'] = f'Bearer {access_token}'
        response = self.get_response(request)
        if access_token and refresh_token:
            response.set_cookie(ACCESS_TOKEN_KEY, access_token, httponly=True)
            response.set_cookie(REFRESH_TOKEN_KEY, refresh_token, httponly=True)
        return response

    def is_token_expired(self, token) -> bool:
        data = self.token_to_dict(token)
        exp = datetime.fromtimestamp(data['exp'])
        return exp < datetime.now()

    def extract_user_id(self, token) -> int:
        data = self.token_to_dict(token)
        return data['user_id']

    def token_to_dict(self, token) -> dict:
        start = token.find('.')
        end = token.rfind('.')
        extracted = token[start+1:end]
        mod = len(extracted) % 4
        if mod == 2:
            extracted += "=="
        if mod == 3:
            extracted += "="
        return json.loads(base64.decodebytes(extracted.encode()))

    def renew_tokens(self, token) -> (str, str):
        user_id = self.extract_user_id(token)
        user = User.objects.get(id=user_id)
        for token_to_blacklist in OutstandingToken.objects.filter(user=user).exclude(
            id__in=BlacklistedToken.objects.filter(token__user=user).values_list('token_id', flat=True),
        ):
            BlacklistedToken.objects.create(token=token_to_blacklist)
        refresh = RefreshToken.for_user(user)
        token = refresh.access_token
        return token, refresh

Run application

Init DB

./manage.py makemigrations
./manage.py migrate

Create user

./manage.py createsuperuser

Start server

./manage.py runserver

POC

poc.py

import requests
import uuid

USER_LOGIN = 'admin'
USER_PASSWORD = 'admin'

URL_TOKEN = 'http://localhost:8000/api/token/'
URL_PERSON = 'http://localhost:8000/api/person/'

def get_headers(response=None):
    if response and response.cookies["access_token"] and response.cookies["refresh_token"]:
        access_token = response.cookies["access_token"]
        refresh_token = response.cookies["refresh_token"]
    else:
        token_response = requests.post(URL_TOKEN, data={'username': USER_LOGIN, 'password': USER_PASSWORD}).json()
        access_token = token_response['access']
        refresh_token = token_response['refresh']
    return {'Cookie': f'access_token={access_token};refresh_token={refresh_token}'}

response = requests.get(URL_PERSON, headers=get_headers())
print(f'[+] {"EXISTING RECORDS":16}:', response.json())

for record in response.json():
    response = requests.delete(f'{URL_PERSON}{record["id"]}/', headers=get_headers(response))
    print(f'[+] {"DELETED":16}:', record)

with open('/etc/passwd') as f:
    payload = {"name": str(uuid.uuid4())}
    response = requests.post(URL_PERSON, data=payload, headers=get_headers(response))
    print(f'[+] {"CREATED":16}:', response.json())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment