Skip to content

Instantly share code, notes, and snippets.

@AndrewIngram
Last active June 14, 2022 15:25
Show Gist options
  • Save AndrewIngram/9d3dd742d1801e898add1e2aa5130639 to your computer and use it in GitHub Desktop.
Save AndrewIngram/9d3dd742d1801e898add1e2aa5130639 to your computer and use it in GitHub Desktop.
Draft: Django Ninja Idempotency Keys
# Custom exception handler
@api.exception_handler(Exception)
def service_unavailable(request, exc):
status_code = 500
body = {"detail": "Something went wrong, our team has been notified of the issue."}
if isinstance(exc, IdempotencyWrappedException):
finalise_idempotency_entry(exc.entry, status_code, body)
response = api.create_response(
request,
body,
status=status_code,
)
return response
import hashlib
import json
from datetime import timedelta
from functools import partial, wraps
from typing import Any, Callable, Dict, Tuple
from ninja import Header
from ninja.operation import Operation
from pydantic import BaseModel
from django.utils import timezone
from .core import Conflict, UnprocessableEntity
from .models import IdempotencyEntry, IdempotencyEntryMethods, IdempotencyEntryState
class IdempotencyWrappedException(Exception):
entry: IdempotencyEntry
original_exception: Exception
def __init__(self, original_exception, entry, *args, **kwargs):
super().__init__(*args, **kwargs)
self.entry = entry
self.original_exception = original_exception
def finalise_idempotency_entry(entry, status_code, response_dict):
entry.response_status = status_code
entry.response_body = json.dumps(response_dict)
entry.state = IdempotencyEntryState.COMPLETE
entry.save()
def hashable_value(value):
if isinstance(value, BaseModel):
return value.json()
return value
def generate_request_fingerprint(*args, **kwargs):
hashed_args = ["%s" % json.dumps(hashable_value(arg)) for arg in args]
hashed_kwargs = [
"%s " % json.dumps((key, hashable_value(value))) for (key, value) in kwargs.items()
]
return hashlib.sha256(":".join(hashed_args + hashed_kwargs).encode("utf-8")).hexdigest()
def add_idempotency_responses_to_operation(op: Operation):
status_codes = op.response_models.keys()
if 409 not in status_codes:
op.response_models[409] = op._create_response_model(Conflict)
if 422 not in status_codes:
op.response_models[422] = op._create_response_model(UnprocessableEntity)
def _inject_idempotency(view_func: Callable, is_required=False):
@wraps(view_func)
def view_with_func_idempotency(request, *args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
idempotency_key = kwargs.pop("fronted_idempotency_key")
method = request.method.upper()
if not any(x.value == method for x in IdempotencyEntryMethods) or (
not idempotency_key and not is_required
):
# Idempotency not used, handle view as normal
return view_func(request, *args, **kwargs)
user = request.auth
if not user:
raise ValueError("Anonymous users can't use idempotency keys")
cutoff = timezone.now() + timedelta(days=1)
request_path = request.path
request_fingerprint = generate_request_fingerprint(request.path, *args, **kwargs)
entry, created = IdempotencyEntry.objects.get_or_create(
user=user,
idempotency_key=idempotency_key,
created_at__lte=cutoff,
defaults=dict(
request_path=request_path,
request_method=IdempotencyEntryMethods[method],
request_fingerprint=request_fingerprint,
),
)
if not created:
# If path, fingerprint or state are inappropriate, return 409 conflict
if not all(
[
entry.request_method == IdempotencyEntryMethods[method],
entry.request_path == request_path,
entry.request_fingerprint == request_fingerprint,
]
):
return 422, UnprocessableEntity(
message="Request inputs not a match for existing idempotency key"
)
elif not entry.state == IdempotencyEntryState.COMPLETE:
return 409, Conflict(
message="Conflict with incomplete request for existing idempotency key"
)
return entry.response_status, json.loads(entry.response_body)
try:
result = view_func(request, *args, **kwargs)
except BaseException as exc:
raise IdempotencyWrappedException(exc, entry)
if isinstance(result, tuple) and len(result) == 2:
status_code = result[0]
response_dict = result[1].dict()
else:
status_code = 200
response_dict = result.dict()
finalise_idempotency_entry(entry, status_code, response_dict)
return result
if hasattr(view_func, "_ninja_contribute_args"):
view_with_func_idempotency._ninja_contribute_args = ( # type: ignore
view_func._ninja_contribute_args # type: ignore
)
else:
view_with_func_idempotency._ninja_contribute_args = [] # type: ignore
view_with_func_idempotency._ninja_contribute_args.append( # type: ignore
(
"fronted_idempotency_key",
str,
Header(... if is_required else None, alias="Idempotency-Key"),
),
)
view_with_func_idempotency._ninja_contribute_to_operation = (
add_idempotency_responses_to_operation
)
return view_with_func_idempotency
def idempotent(fn) -> Callable:
"""
@api.post(...)
@idempotent
def my_view(request):
...
"""
return _inject_idempotency(fn)
class IdempotencyEntryState(models.TextChoices):
PENDING = "PENDING", "Pending"
COMPLETE = "COMPLETE", "Complete"
class IdempotencyEntryMethods(models.TextChoices):
POST = "POST", "Post"
PATCH = "PATCH", "Patch"
class IdempotencyEntry(models.Model):
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
idempotency_key = models.TextField(db_index=True)
state = models.CharField(
max_length=8, choices=IdempotencyEntryState.choices, default=IdempotencyEntryState.PENDING
)
user = models.ForeignKey(
"auth.User", related_name="idempotency_entries", on_delete=models.CASCADE
)
request_path = models.TextField()
request_method = models.CharField(max_length=5, choices=IdempotencyEntryMethods.choices)
request_fingerprint = models.TextField()
response_body = models.TextField(null=True)
response_status = models.PositiveSmallIntegerField(null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
index_together = (
(
"user",
"idempotency_key",
),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment