Skip to content

Instantly share code, notes, and snippets.

@devxpy
Last active April 26, 2020 11:49
Show Gist options
  • Save devxpy/e7a1e639dce564a045de14c9668a7864 to your computer and use it in GitHub Desktop.
Save devxpy/e7a1e639dce564a045de14c9668a7864 to your computer and use it in GitHub Desktop.
Django Async CRUD - Basic CRUD operations with Django ORM in an async context
import typing
from django.db import models
from channels.db import database_sync_to_async
M = typing.TypeVar("M", bound=models.Model)
class AsyncCrud:
def __init__(self, model: typing.Type[M]):
self.model = model
@database_sync_to_async
def create(self, **kwargs) -> typing.Awaitable[M]:
return self.model.objects.create(**kwargs)
@database_sync_to_async
def read(self, obj_id) -> typing.Awaitable[M]:
return self.model.objects.get(id=obj_id)
@database_sync_to_async
def update(self, obj_id, **updates) -> typing.Awaitable[M]:
obj = self.model.objects.get(id=obj_id)
for attr, value in updates.items():
setattr(obj, attr, value)
obj.save()
return obj
@database_sync_to_async
def delete(self, obj_id) -> typing.Awaitable[M]:
obj = self.model.objects.get(id=obj_id)
obj.delete()
return obj
#
# usage
#
# Polls is a django model
PollsCrud = AsyncCrud(Polls)
# C
poll = await PollsCrud.create(name="Are you ready to async?")
# R
print(await PollsCrud.read(poll.id)
# U
await PollsCrud.update(poll.id, votes=5)
# D
await PollsCrud.delete(poll.id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment