Skip to content

Instantly share code, notes, and snippets.

@teimurjan
Created July 5, 2018 04:21
Show Gist options
  • Save teimurjan/5dcc351c06f5b67e403d37c10dd88634 to your computer and use it in GitHub Desktop.
Save teimurjan/5dcc351c06f5b67e403d37c10dd88634 to your computer and use it in GitHub Desktop.
django-solid-architecture
class DTO:
def __init__(self, id_):
self._id = id_
@property
def id(self):
return self._id
class UserDTO(DTO):
def __init__(self, id_, email, name, date_joined, password, group, is_active):
super().__init__(id_)
self._email = email
self._name = name
self._date_joined = date_joined
self._password = password
self._group = group
self._is_active = is_active
@property
def email(self):
return self._email
@property
def name(self):
return self._name
@property
def date_joined(self):
return self._date_joined
@property
def password(self):
return self._password
@property
def group(self):
return self._group
@property
def is_active(self):
return self._is_active
from django.db import models
from api.dto.user import UserDTO
from api.models.group import Group
class User(models.Model):
email = models.EmailField(unique=True, blank=False, null=False)
name = models.CharField(max_length=60, blank=False, null=False)
date_joined = models.DateTimeField(auto_now_add=True)
password = models.CharField(max_length=255, blank=False, null=False)
group = models.ForeignKey(Group, related_name='users', related_query_name='user',
db_column='group', on_delete=models.CASCADE)
is_active = models.BooleanField(default=False)
def to_dto(self):
return UserDTO(self.id, self.email, self.name, self.date_joined,
self.password, self.group.to_dto(), self.is_active)
import bcrypt
from api.models import User
class Repo:
def __init__(self, model_cls):
self.model_cls = model_cls
def get_by_id(self, id_):
return self._get_model_obj_by_id(id_).to_dto()
def _get_model_obj_by_id(self, id_):
try:
return self.model_cls.objects.get(id=id_)
except self.model_cls.DoesNotExist:
raise self.DoesNotExist()
def update(self, id_, **kwargs):
try:
model_obj = self.model_cls.objects.get(id=id_)
if not isinstance(model_obj, self.model_cls):
raise self.InvalidInstance()
else:
for attr, value in kwargs.items():
if not hasattr(model_obj, attr):
raise self.InvalidAttribute()
elif value is not None:
setattr(model_obj, attr, value)
model_obj.save()
return model_obj.to_dto()
except self.model_cls.DoesNotExist:
raise self.DoesNotExist()
def create(self, **kwargs):
model_obj = self.model_cls()
for attr, value in kwargs.items():
if not hasattr(model_obj, attr):
raise self.InvalidAttribute()
elif value is not None:
setattr(model_obj, attr, value)
model_obj.save()
return model_obj.to_dto()
def get_all(self):
return tuple(obj.to_dto() for obj in self.model_cls.objects.all().order_by('pk'))
def get_first_by(self, **kwargs):
objects = self.model_cls.objects.filter(**kwargs)
if not objects.exists():
raise self.DoesNotExist()
else:
return objects[0].to_dto()
def filter_by(self, **kwargs):
return tuple(obj.to_dto() for obj in self.model_cls.objects.filter(**kwargs).order_by('pk'))
def delete(self, id_):
try:
self.model_cls.objects.get(id=id_).delete()
except self.model_cls.DoesNotExist:
raise self.DoesNotExist()
class DoesNotExist(Exception):
pass
class InvalidInstance(Exception):
pass
class InvalidAttribute(Exception):
pass
def encrypt_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode('utf-8')
class UserRepo(Repo):
def __init__(self):
super().__init__(User)
def create(self, password, **kwargs):
return super().create(password=encrypt_password(password), **kwargs)
def update(self, user_id, password=None, **kwargs):
if password:
return super().update(user_id, password=encrypt_password(password), **kwargs)
else:
return super().update(user_id, **kwargs)
def filter_by_roles(self, roles):
return (user.to_dto() for user in self.model_cls.objects.filter(role__in=roles).order_by('pk'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment