-
-
Save AndrewIngram/9d3dd742d1801e898add1e2aa5130639 to your computer and use it in GitHub Desktop.
Draft: Django Ninja Idempotency Keys
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Custom exception handler | |
@api.exception_handler(Exception) | |
def service_unavailable(request, exc): | |
status_code = 500 | |
body = {"detail": "Something went wrong, our team has been notified of the issue."} | |
if isinstance(exc, IdempotencyWrappedException): | |
finalise_idempotency_entry(exc.entry, status_code, body) | |
response = api.create_response( | |
request, | |
body, | |
status=status_code, | |
) | |
return response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import json | |
from datetime import timedelta | |
from functools import partial, wraps | |
from typing import Any, Callable, Dict, Tuple | |
from ninja import Header | |
from ninja.operation import Operation | |
from pydantic import BaseModel | |
from django.utils import timezone | |
from .core import Conflict, UnprocessableEntity | |
from .models import IdempotencyEntry, IdempotencyEntryMethods, IdempotencyEntryState | |
class IdempotencyWrappedException(Exception): | |
entry: IdempotencyEntry | |
original_exception: Exception | |
def __init__(self, original_exception, entry, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.entry = entry | |
self.original_exception = original_exception | |
def finalise_idempotency_entry(entry, status_code, response_dict): | |
entry.response_status = status_code | |
entry.response_body = json.dumps(response_dict) | |
entry.state = IdempotencyEntryState.COMPLETE | |
entry.save() | |
def hashable_value(value): | |
if isinstance(value, BaseModel): | |
return value.json() | |
return value | |
def generate_request_fingerprint(*args, **kwargs): | |
hashed_args = ["%s" % json.dumps(hashable_value(arg)) for arg in args] | |
hashed_kwargs = [ | |
"%s " % json.dumps((key, hashable_value(value))) for (key, value) in kwargs.items() | |
] | |
return hashlib.sha256(":".join(hashed_args + hashed_kwargs).encode("utf-8")).hexdigest() | |
def add_idempotency_responses_to_operation(op: Operation): | |
status_codes = op.response_models.keys() | |
if 409 not in status_codes: | |
op.response_models[409] = op._create_response_model(Conflict) | |
if 422 not in status_codes: | |
op.response_models[422] = op._create_response_model(UnprocessableEntity) | |
def _inject_idempotency(view_func: Callable, is_required=False): | |
@wraps(view_func) | |
def view_with_func_idempotency(request, *args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any: | |
idempotency_key = kwargs.pop("fronted_idempotency_key") | |
method = request.method.upper() | |
if not any(x.value == method for x in IdempotencyEntryMethods) or ( | |
not idempotency_key and not is_required | |
): | |
# Idempotency not used, handle view as normal | |
return view_func(request, *args, **kwargs) | |
user = request.auth | |
if not user: | |
raise ValueError("Anonymous users can't use idempotency keys") | |
cutoff = timezone.now() + timedelta(days=1) | |
request_path = request.path | |
request_fingerprint = generate_request_fingerprint(request.path, *args, **kwargs) | |
entry, created = IdempotencyEntry.objects.get_or_create( | |
user=user, | |
idempotency_key=idempotency_key, | |
created_at__lte=cutoff, | |
defaults=dict( | |
request_path=request_path, | |
request_method=IdempotencyEntryMethods[method], | |
request_fingerprint=request_fingerprint, | |
), | |
) | |
if not created: | |
# If path, fingerprint or state are inappropriate, return 409 conflict | |
if not all( | |
[ | |
entry.request_method == IdempotencyEntryMethods[method], | |
entry.request_path == request_path, | |
entry.request_fingerprint == request_fingerprint, | |
] | |
): | |
return 422, UnprocessableEntity( | |
message="Request inputs not a match for existing idempotency key" | |
) | |
elif not entry.state == IdempotencyEntryState.COMPLETE: | |
return 409, Conflict( | |
message="Conflict with incomplete request for existing idempotency key" | |
) | |
return entry.response_status, json.loads(entry.response_body) | |
try: | |
result = view_func(request, *args, **kwargs) | |
except BaseException as exc: | |
raise IdempotencyWrappedException(exc, entry) | |
if isinstance(result, tuple) and len(result) == 2: | |
status_code = result[0] | |
response_dict = result[1].dict() | |
else: | |
status_code = 200 | |
response_dict = result.dict() | |
finalise_idempotency_entry(entry, status_code, response_dict) | |
return result | |
if hasattr(view_func, "_ninja_contribute_args"): | |
view_with_func_idempotency._ninja_contribute_args = ( # type: ignore | |
view_func._ninja_contribute_args # type: ignore | |
) | |
else: | |
view_with_func_idempotency._ninja_contribute_args = [] # type: ignore | |
view_with_func_idempotency._ninja_contribute_args.append( # type: ignore | |
( | |
"fronted_idempotency_key", | |
str, | |
Header(... if is_required else None, alias="Idempotency-Key"), | |
), | |
) | |
view_with_func_idempotency._ninja_contribute_to_operation = ( | |
add_idempotency_responses_to_operation | |
) | |
return view_with_func_idempotency | |
def idempotent(fn) -> Callable: | |
""" | |
@api.post(...) | |
@idempotent | |
def my_view(request): | |
... | |
""" | |
return _inject_idempotency(fn) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class IdempotencyEntryState(models.TextChoices): | |
PENDING = "PENDING", "Pending" | |
COMPLETE = "COMPLETE", "Complete" | |
class IdempotencyEntryMethods(models.TextChoices): | |
POST = "POST", "Post" | |
PATCH = "PATCH", "Patch" | |
class IdempotencyEntry(models.Model): | |
id = models.UUIDField(primary_key=True, default=uuid4, editable=False) | |
idempotency_key = models.TextField(db_index=True) | |
state = models.CharField( | |
max_length=8, choices=IdempotencyEntryState.choices, default=IdempotencyEntryState.PENDING | |
) | |
user = models.ForeignKey( | |
"auth.User", related_name="idempotency_entries", on_delete=models.CASCADE | |
) | |
request_path = models.TextField() | |
request_method = models.CharField(max_length=5, choices=IdempotencyEntryMethods.choices) | |
request_fingerprint = models.TextField() | |
response_body = models.TextField(null=True) | |
response_status = models.PositiveSmallIntegerField(null=True) | |
created_at = models.DateTimeField(auto_now_add=True) | |
updated_at = models.DateTimeField(auto_now=True) | |
class Meta: | |
index_together = ( | |
( | |
"user", | |
"idempotency_key", | |
), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment