Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 25 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save lordsarcastic/a3681c314b6c00e96e10f10f60fe0ce8 to your computer and use it in GitHub Desktop.
Save lordsarcastic/a3681c314b6c00e96e10f10f60fe0ce8 to your computer and use it in GitHub Desktop.
This is a generic implementation of OTP for Django applications. It can be extended to any framework or platform that uses OTP.
This is a robust implementation that is extensible for anything that requires the use of OTP. Want to use OTP to verify
a user? Check! Want to use OTP to validate an order? CHeck! Want to use OTP to reset a password? CHEck! Want to use
OTP to verify a device? CHECk! Want to use OTP to fight people? CHECK!
I used: Django (framework), Django Rest Framework (a plugin for REST API), PostgreSQL (database)
and email (for sending the otp). You can substitute any of these for whatever you want, like using Redis instead of Postgres.
While this solution is built to be used for Django, I have added comments to explain the process for developers using
other frameworks.
Legend:
- otp.py: Base logic for implementing OTP functionality in applications
- user.py: User model used in OTP example
- utils.models.py: Contains base model for all models
- User verification process.py: An example using OTP to verify a user's account
- Reset password.py: An example using OTP to implement forgot password
from random import randrange
from typing import Any, Dict, Optional, TypedDict, Union
from django.core import signing
from django.core.mail import EmailMultiAlternatives
from django.db import models
from django.utils import timezone
# a user-defined abstract model that includes `uuid`, `created` and `last_updated`
# to track the state of a model. UUID is the primary key of the model.
from utils.models import TrackObjectStateMixin
from .user import User
class CompleteOTPType(TypedDict):
instance: "AuthOTP"
code: int
class CreateUserAuthOTPType(TypedDict):
user_auth_otp: "UserAuthOTP"
complete_otp: CompleteOTPType
class AuthOTP(TrackObjectStateMixin):
# Number of seconds till the OTP expires
TIMEOUT = 5 * 60
# this length should never go below 4
OTP_LENGTH = 6
# this is the actual code sent to the user. it is encrypted
# and signed and not stored as plain text for security purposes
code = models.CharField(max_length=128)
# we use a duration field to store the number of seconds
# the OTP expires from the time of creation, `created`.
timeout = models.DurationField(default=timezone.timedelta(seconds=TIMEOUT))
def __str__(self):
return f"AuthOTP instance at time: {self.created}"
@classmethod
def generate_otp(cls, **extra_fields) -> CompleteOTPType:
"""
The idea behind the generation of an otp or anything secret is simple.
First, we generate a random code: the otp itself.
This value is only visible to the user and cannot be inspected in
any way. To ensure that this process is reversible for verifying
OTPs, we do this:
- Use the uuid of the AuthOTP instance as a salt to guarantee
randomness
- Sign the generated code and strip the code out of the output
- Then we store the result from above as the `instance.code`.
"""
otp: cls = cls(**extra_fields)
# we know this will always be the length `cls.OTP_LENGTH` digits
# so we can slice this out after signing
code = randrange(
10 ** (cls.OTP_LENGTH - 1), (10**cls.OTP_LENGTH) - 1
)
# you can swap this for any encryption format you want. I want to
# stick to core Django
signed_code = signing.TimestampSigner(salt=str(otp.uuid)).sign(code)
# we strip out the characters of the length `cls.OTP_LENGTH`
# this is only because the sigining algorithm from above
# adds the original code generated to the output. Of course we
# don't want that to be available in plain sight :D
otp.code = signed_code[cls.OTP_LENGTH :]
otp.save()
# this is the only point you can ever access the actual OTP code
# If the output is not saved in a variable, it is gone forever
# and ever. and ever. and ever.
return {"instance": otp, "code": code}
@classmethod
def verify_otp(cls, otp_instance: "AuthOTP", code: int) -> bool:
"""
Verification only requires that user can provide the first
4 values of `instance.code` which is the OTP sent to the user's mail.
Without the OTP, it is impossible to verify the OTP even from the
stored instance. This guarantees security and places it in the
hands of the user.
"""
try:
unsigned_code = int(
signing.TimestampSigner(salt=(str(otp_instance.uuid))).unsign(
f"{code}{otp_instance.code}", max_age=otp_instance.timeout
)
)
except (signing.BadSignature, signing.SignatureExpired, IndexError):
return False
return unsigned_code == code
def has_expired(self) -> bool:
# we keep this for jobs to have an API accessible to clear
# expired otps.
return self.created + self.timeout < timezone.now()
class UserAuthOTP(models.Model):
"""
An intermediary table to bind users and otps as otps are meant to be
anonymous. This table handles management of OTPs for various reasons
throughout the codebase. Want to generate OTPs to confirm an order?
Simply add it to the `OTPReasons` enum.
"""
# we expect this table to be used for future OTPs
# in various respect. All the developer has to do
# is add a new `Reason` and do as seen fit.
# for non Django devs: the chars in all caps are
# the actual text stored in the DB.
class OTPReasons(models.TextChoices):
ACTIVATE_USER = "ACTIVATE_USER", "Activate User"
FORGOT_PASSWORD = "FORGOT_PASSWORD", "Forgot password"
# nothing special, the line was simply too long
FORGOT_PASSWORD_TOKEN = (
"FORGOT_PASSWORD_TOKEN",
"Forgot password token",
)
user = models.ForeignKey(User, on_delete=models.CASCADE)
otp = models.ForeignKey(AuthOTP, on_delete=models.CASCADE)
# a way to group OTPs into malleable categories
reason = models.CharField(
max_length=64,
choices=OTPReasons.choices,
default=OTPReasons.ACTIVATE_USER,
)
def __str__(self) -> str:
return f"{self.user} | {self.reason}"
class Meta:
# we add a composite index made up of the User table and the
# specified reason. This means that only one `UserAuthOTP`
# instance can exist for a user per reason. Meanwhile, multiple
# otps may exist with only 0 or 1 valid at any point in time
# for a particular `reason`.
constraints = [
models.UniqueConstraint(
fields=["reason", "user"], name="unique_reason_for_user"
)
]
@classmethod
def create_otp_for_user(
cls, user: User, reason: OTPReasons
) -> CreateUserAuthOTPType:
"""
This is the expected method to be used to create an OTP
for a user. It yields both the auth_otp instance and
the generated code. Again, the generated code will only be
available here. If it isn't stored in a variable, it is
lost forever.
"""
otp = AuthOTP.generate_otp()
try:
# if a `UserAuthOTP` instance already exists for the reason,
# we reuse it.
user_auth_otp = cls.objects.get(user=user, reason=reason)
except cls.DoesNotExist:
# if not, we create one for the reason
user_auth_otp = UserAuthOTP(user=user, reason=reason)
user_auth_otp.otp = otp["instance"]
user_auth_otp.save()
return {"user_auth_otp": user_auth_otp, "complete_otp": otp}
"""
The process of resetting password password is in three stages:
1. Initialize password reset: At this stage, the OTP is created and sent to the email address of the user specified if
the user exists in the database. The OTP sent is of type "FORGOT_PASSWORD".
2. Receive OTP from user: In this stage, the client sends the OTP typed in by the user for verification. The OTP code, reason
and user are all validated to be correct:
- The otp must be correct
- The otp must not have expired
- The otp must belong to the user
- The otp must be for the right reasons
The server creates another OTP (basically a temporary token) to store the state of this password reset. This token expires
really quickly and is hidden from the user. The client is to submit this token with the new password of the user.
The reason for this process is:
- To maintain a form of state for the password reset process
- To ensure the client has a way of immediately verifying the OTP sent by the user
- To ensure security by invalidating existing password reset processes by malicious users. If during stage 2, another password
reset process is also in stage 2, the most recent lives and the oldest is invalidated.
3. Complete password reset: The token from stage 2 is sent by the client alongside the new password. The token is validated
also based on the criteria in stage 2. The user's password is reset at this point.
"""
# stage 1:
from datetime import timedelta
from typing import Dict, Optional
from rest_framework import serializers, generics, response, status, views
from .models import AuthOTP, User, UserAuthOTP
def convert_duration_to_minutes(duration: timedelta) -> int:
return int((duration.total_seconds() / 5))
class InitializePasswordResetSerializer(serializers.Serializer):
"""
Serializer handling stage one of the process
"""
EMAIL_SUBJECT = "Reset your account password"
EMAIL_MESSAGE = (
"Use this code to reset your account password:\n"
"{}\n"
"This code will expire in {} minutes."
)
EMAIL_SENDER = "App <no-reply@app.com>"
email = serializers.EmailField()
def validate_email(self, value) -> Optional[User]:
try:
user = User.objects.get(email=value)
except User.DoesNotExist:
return
return user
def save(self):
user: Optional[User] = self.validated_data["email"]
if not user:
return
user_auth_otp = UserAuthOTP.create_otp_for_user(
user, UserAuthOTP.OTPReasons.FORGOT_PASSWORD
)
user.email_user(
self.EMAIL_SUBJECT,
self.EMAIL_MESSAGE.format(
user_auth_otp["complete_otp"]["code"],
convert_duration_to_minutes(
user_auth_otp["complete_otp"]["instance"].timeout
),
),
self.EMAIL_SENDER,
)
class InitializePasswordResetAPIView(generics.GenericAPIView):
"""
Endpoint handling stage one of the process
"""
serializer_class = InitializePasswordResetSerializer
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return response.Response(status=status.HTTP_204_NO_CONTENT)
# stage 2
class ReceiveOTPForPasswordResetSerializer(serializers.Serializer):
"""
Serializer handling stage two of the process
"""
ERROR_MESSAGE = "OTP is invalid"
email = serializers.EmailField(write_only=True)
code = serializers.IntegerField(read_only=True)
otp = serializers.IntegerField(write_only=True)
def validate(self, attrs):
verified = False
try:
user: User = User.objects.get(email=attrs["email"])
# we're setting context here only because
# we want to raise this error inside the serializer
self.context["user"] = user
user_auth_otp = UserAuthOTP.objects.get(
user=user, reason=UserAuthOTP.OTPReasons.FORGOT_PASSWORD
)
verified = AuthOTP.verify_otp(user_auth_otp.otp, int(attrs["otp"]))
except (User.DoesNotExist, UserAuthOTP.DoesNotExist):
raise serializers.ValidationError(self.ERROR_MESSAGE)
if not verified:
raise serializers.ValidationError(self.ERROR_MESSAGE)
return attrs
@transaction.atomic
def save(self):
user_auth_otp = UserAuthOTP.create_otp_for_user(
self.context["user"], UserAuthOTP.OTPReasons.FORGOT_PASSWORD_TOKEN
)
UserAuthOTP.objects.get(
user=self.context["user"],
reason=UserAuthOTP.OTPReasons.FORGOT_PASSWORD,
).delete()
return {"code": user_auth_otp["complete_otp"]["code"]}
class ReceiveOTPForPasswordResetAPIView(generics.CreateAPIView):
"""
Endpoint handling stage two of the process
"""
serializer_class = ReceiveOTPForPasswordResetSerializer
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
result = serializer.save()
return response.Response(
result,
status=status.HTTP_200_OK,
)
# stage 3
class CompletePasswordResetSerializer(serializers.Serializer):
"""
Serializer handling stage three of the process
"""
ERROR_MESSAGE = "Password reset request invalid"
code = serializers.IntegerField(write_only=True)
email = serializers.EmailField()
password = serializers.CharField(min_length=8, write_only=True)
def validate(self, attrs):
verified = False
try:
user: User = User.objects.get(email=attrs["email"])
# we're setting context here only because
# we want to raise this error inside the serializer
self.context["user"] = user
user_auth_otp = UserAuthOTP.objects.get(
user=user, reason=UserAuthOTP.OTPReasons.FORGOT_PASSWORD_TOKEN
)
verified = AuthOTP.verify_otp(user_auth_otp.otp, attrs["code"])
except (User.DoesNotExist, UserAuthOTP.DoesNotExist) as e:
raise serializers.ValidationError(self.ERROR_MESSAGE)
if not verified:
raise serializers.ValidationError(self.ERROR_MESSAGE)
return attrs
def save(self):
user = self.context["user"]
user.set_password(self.validated_data["password"])
user.save()
user_auth_otp = UserAuthOTP.objects.get(
user=user, reason=UserAuthOTP.OTPReasons.FORGOT_PASSWORD_TOKEN
)
user_auth_otp.delete()
class CompletePasswordResetAPIView(generics.GenericAPIView):
"""
Endpoint handling stage three of the process
"""
serializer_class = CompletePasswordResetSerializer
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return response.Response(
{"message": "Password reset successfully"},
status=status.HTTP_200_OK,
)
"""
Two APIs are called to verify a user:
1. The `InitializeActivateUserAPIView` is first called to send OTP to the user's
email address (phone numbers can be used with Twilio too). This endpoint handles checks on whether the user
has been verified before or not. It returns a 204 response if the OTP is sent and a 403 if the user is already verified.
2. The `CompleteUserVerificationProcessAPIView` is used to verify the code sent by the user through the client and
activates the user if everything is right:
- The otp sent is correct
- The otp hasn't expired
- The otp is for the right reason
"""
from typing import Any, Dict, Optional
from django.http import HttpRequest, HttpResponsePermanentRedirect
from django.shortcuts import redirect
from rest_framework import generics, response, status, views
from rest_framework.permissions import IsAuthenticated
from .models import AuthOTP, UserAuthOTP, User
class InitializeActivateUserAPIView(generics.CreateAPIView):
"""
This is the first endpoint to be called.
"""
EMAIL_SUBJECT = "Activate your account"
EMAIL_MESSAGE = (
"Use this code to activate your account:\n"
"{}\n"
"This code will expire in {} minutes."
)
EMAIL_SENDER = "App <no-reply@app.com>"
permission_classes = [IsAuthenticated]
def process_verification_initialization(self):
user: User = self.request.user
if user.is_verified:
return False
# here, we simply create an OTP for activating the user
# using the functionalities in the otp module. If there is
# an existing OTP that hasn't been used and is still live,
# it is detached and there by invalidated here.
user_auth_otp = UserAuthOTP.create_otp_for_user(
self.request.user, UserAuthOTP.OTPReasons.ACTIVATE_USER
)
# the `AbstractUser` which the `User` model inherits
# implements methods like `email_user` which can be used
# to send emails to the user.
# this method call may not run due to some unforseen circumstances
# from mailgun or ourselves. We are not catching this exception
# so we can investigate it in case it happens.
user.email_user(
self.EMAIL_SUBJECT,
self.EMAIL_MESSAGE.format(
user_auth_otp["complete_otp"]["code"],
convert_duration_to_minutes(
user_auth_otp["complete_otp"]["instance"].timeout
),
),
self.EMAIL_SENDER,
)
return True
def post(self, request, *args, **kwargs):
if self.process_verification_initialization():
return response.Response(status=status.HTTP_204_NO_CONTENT)
return response.Response(
data={"details": "User already verified"},
status=status.HTTP_403_FORBIDDEN,
)
from rest_framework import status
from rest_framework.exceptions import APIException
class InvalidOTP(APIException):
"""
A custom exception wrapper over the 422 status code
"""
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
default_detail = (
"This OTP is either: invalid, already used or already expired."
)
default_code = "wrong_otp"
from typing import Dict, Optional
from django.db import transaction
from rest_framework import serializers
from .models import AuthOTP, User, UserAuthOTP
class CompleteVerificationProcessSerializer(serializers.Serializer):
"""
This is simply a class that serializes input from the client and processes
the data to perform actions on behalf of the endpoint.
"""
code = serializers.IntegerField()
def validate_code(self, value: int):
try:
# first we look for the `UserAuthOTP` in the database
user_auth_otp = UserAuthOTP.objects.get(
user=self.context["request"].user,
reason=UserAuthOTP.OTPReasons.ACTIVATE_USER,
)
except UserAuthOTP.DoesNotExist:
# we can't find it!
raise InvalidOTP
# next we verify if the code submitted by the user is
# correct
if not AuthOTP.verify_otp(user_auth_otp.otp, value):
raise exceptions.InvalidOTP
return user_auth_otp
# atomic so it rollsback in case something goes wrong
@transaction.atomic
def save(self):
user: User = self.context["request"].user
user.is_verified = True
user.save()
self.validated_data["code"].delete()
return
class CompleteUserVerificationProcessAPIView(generics.CreateAPIView):
"""
This endpoint relies on the serializer above to do the main work
"""
permission_classes = [IsAuthenticated]
serializer_class = CompleteVerificationProcessSerializer
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(
data=request.data, context={"request": request}
)
# this line runs the code in `validate_code` for us
serializer.is_valid(raise_exception=True)
serializer.save()
return response.Response(
{"message": "User successfully verified"},
status=status.HTTP_200_OK,
)
from django.contrib.auth.models import AbstractUser
from django.conf import settings
from django.db import models
from utils.models import TrackObjectStateMixin
class User(AbstractUser, TrackObjectStateMixin):
email = models.EmailField(unique=True)
is_verified = models.BooleanField(default=False, index=True)
USERNAME_FIELD = "email"
REQUIRED_FIELDS = ["username"]
import uuid
from django.db import models
class TrackObjectStateMixin(models.Model):
uuid = models.UUIDField(default=uuid.uuid4, unique=True)
created = models.DateTimeField(auto_now_add=True)
last_updated = models.DateTimeField(auto_now=True)
class Meta:
abstract = True
@melodyogonna
Copy link

Very robust implementation chief, well done.

@lordsarcastic
Copy link
Author

Very robust implementation chief, well done.

Thank you 🔥

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment