Skip to content

Instantly share code, notes, and snippets.

@rodrigondec
Last active June 5, 2019 17:10
Show Gist options
  • Save rodrigondec/98070516a7231cc472d9420a6423b01b to your computer and use it in GitHub Desktop.
Save rodrigondec/98070516a7231cc472d9420a6423b01b to your computer and use it in GitHub Desktop.
Django Utils
from pydoc import locate
from django.conf import settings
def jwt_response_payload_handler(token, user=None, request=None):
""" Custom response payload handler.
This function controlls the custom payload after login or token refresh. This data is returned through the web API.
"""
user_serializer = locate(settings.AUTH_USER_SERIALIZER)
if user_serializer is None:
raise ImportError('Configure a variável "AUTH_USER_SERIALIZER" para utilizar no JWT')
result = user_serializer(user).data
result['token'] = token
return result
import uuid
import pendulum
from factory import DjangoModelFactory, lazy_attribute
from factory.faker import Faker
from .models import BaseModel, BasePointModel, TIMEZONE
class BaseModelFactory(DjangoModelFactory):
class Meta:
model = BaseModel
abstract = True
id = lazy_attribute(lambda x: uuid.uuid4())
created_at = pendulum.now(TIMEZONE)
updated_at = pendulum.now(TIMEZONE)
class BasePointModelFactory(BaseModelFactory):
class Meta:
model = BasePointModel
abstract = True
latitude = Faker('latitude')
longitude = Faker('longitude')
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import exception_handler
from django.db import IntegrityError
def error_handler(exc, context):
# Call REST framework's default exception handler first,
# to get the standard error response.
response = exception_handler(exc, context)
# Now add the HTTP status code to the response.
if response is None:
if isinstance(exc, IntegrityError):
data = {
"non_field_errors": []
}
if 'duplicate key value violates unique constraint' in exc.args[0]:
data["non_field_errors"].append("Já existe um registro com essas informações.")
response = Response(data, status=status.HTTP_400_BAD_REQUEST)
return response
import json
from rest_framework.response import Response
class ResponseCheckMiddleWare(object):
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
if isinstance(response, Response) and response.accepted_media_type == 'application/json':
content = json.loads(response.content.decode('utf8'))
api_interface = ['data', 'message', 'page']
if response._is_rendered:
if not all(i in content for i in api_interface):
response.data = {
'data': None,
'message': response.data,
'page': None,
}
# you need to change private attribute `_is_render`
# to call render second time
response._is_rendered = False
response.render()
else:
print(f'Response: {response}')
print(f'Response content: {response.content}')
print(f'Request data: {response.renderer_context["request"].data}')
response.data = content.get('data')
return response
import uuid
import pendulum
from django.db import models
from simple_history.models import HistoricalRecords
from django.core.validators import MaxValueValidator, MinValueValidator
from django.contrib.gis.geos import Point
from django.contrib.gis.db import models as geo_models
SRID_WGS84 = 4326
TIMEZONE = 'America/Sao_Paulo'
class BaseModel(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
# deleted_at = models.DateTimeField(blank=True, null=True)
skip_history_when_saving = True
objects = models.Manager()
history = HistoricalRecords(inherit=True)
class Meta:
abstract = True
def save(self, *args, **kwargs):
self.updated_at = pendulum.now(TIMEZONE)
super().save(*args, **kwargs)
class PointModelMixin(geo_models.Model):
latitude = models.FloatField(validators=[MinValueValidator(-90), MaxValueValidator(90)],
blank=True, null=True, verbose_name='Latitude')
longitude = models.FloatField(validators=[MinValueValidator(-180), MaxValueValidator(180)],
blank=True, null=True, verbose_name='Longitude')
point = geo_models.PointField(srid=SRID_WGS84, null=True, blank=True, verbose_name='Ponto')
def save(self, *args, **kwargs):
if self.latitude and self.longitude:
self.latitude = float(self.latitude)
self.longitude = float(self.longitude)
self.point = Point(x=self.longitude, y=self.latitude, srid=SRID_WGS84)
super().save(*args, **kwargs)
class Meta:
abstract = True
@property
def coordinates(self):
return [self.latitude, self.longitude]
class BasePointModel(BaseModel, PointModelMixin):
class Meta:
abstract = True
from rest_framework import permissions
class IsAuthenticatedOrCreate(permissions.IsAuthenticated):
def has_permission(self, request, view):
if request.method == 'POST':
return True
return super(IsAuthenticatedOrCreate, self).has_permission(request, view)
class AnonListAndUpdateOwnerOnly(permissions.BasePermission):
"""
Custom permission:
- allow anonymous POST
- allow authenticated GET and PUT on *own* record
- allow all actions for staff
"""
def has_permission(self, request, view):
return view.action == 'list' or request.user and request.user.is_authenticated
def has_object_permission(self, request, view, obj):
return view.action in ['retrieve', 'update', 'partial_update'] and obj.id == request.user.id or request.user.is_staff
class ListAdminOnly(permissions.BasePermission):
"""
Custom permission to only allow access to lists for admins
"""
def has_permission(self, request, view):
return view.action != 'list' or request.user and request.user.is_staff
class IsAuthenticatedOrGet(permissions.IsAuthenticated):
def has_permission(self, request, view):
if request.method == 'GET':
return True
return super(IsAuthenticatedOrGet, self).has_permission(request, view)
from django.core.management import BaseCommand
from django.contrib.auth import get_user_model
class Command(BaseCommand):
help = 'Crate super user'
def handle(self, *args, **options):
model = get_user_model()
if not model.objects.filter(email='superadmin@admin.com'):
superuser = model(username='superuser', email='superuser@admin.com',
is_staff=True, is_superuser=True)
superuser.save()
superuser.set_password('@Admin123')
superuser.save()
self.stdout.write(self.style.SUCCESS('Superuser created!'))
else:
self.stdout.write(self.style.ERROR('Superuser already created!'))
# Python
import json
# Third party
from django.conf import settings
from django.contrib.auth import get_user_model
from rest_framework import renderers
from rest_framework_jwt.settings import api_settings
class DataRenderer(renderers.BaseRenderer):
media_type = 'application/json'
format = 'txt'
@staticmethod
def _build_result():
return {
'data': None,
'message': None,
'page': None,
}
@staticmethod
def _build_data_results(result, view, data):
if hasattr(view, 'action') and view.action == 'list':
if all(i in data.keys() for i in ['count', 'next', 'previous']):
result['page'] = {}
result['page']['count'] = data.pop('count')
result['page']['next'] = data.pop('next')
result['page']['previous'] = data.pop('previous')
return data
def _build_data(self, result, view, data):
data = self._build_data_results(result, view, data)
return data
@staticmethod
def _build_message(data):
message = {
'non_field_errors': []
}
for key, value in data.items():
if isinstance(value, list):
for error in value:
if key == 'non_field_errors':
message['non_field_errors'].append(f'{error}')
else:
message['non_field_errors'].append(f'{key}: {error}')
else:
message['non_field_errors'].append(f'{key}: {value}')
return message
def render(self, data, media_type=None, renderer_context=None):
result = self._build_result()
status_code = renderer_context.get('response').status_code
if 300 > status_code >= 200:
view = renderer_context.get('view')
result['data'] = self._build_data(result, view, data)
else:
result['message'] = self._build_message(data)
return json.dumps(result)
class DataRendererJWT(DataRenderer):
@staticmethod
def _build_data_token(view, data):
if hasattr(view, 'action') and view.action == 'create' and view.basename == 'user':
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
user_model = get_user_model()
if user_model is None:
raise ImportError('Configure a variável "AUTH_USER_MODEL" para utilizar no JWT')
user_unique_field = getattr(settings, 'AUTH_USER_UNIQUE_FIELD', None)
if user_unique_field is None:
raise ImportError('Configure a variável "AUTH_USER_UNIQUE_FIELD" para utilizar no JWT')
payload = jwt_payload_handler(
user_model.objects.get(
**{user_unique_field: data.get(user_unique_field)}
)
)
token = jwt_encode_handler(payload)
data['token'] = token
return data
def _build_data(self, result, view, data):
data = self._build_data_results(result, view, data)
data = self._build_data_token(view, data)
return data
import uuid
from rest_framework import serializers
class ModelField(serializers.Field):
def __init__(self, model, *args, **kwargs):
self.model = model
super().__init__(*args, **kwargs)
def to_representation(self, value):
assert isinstance(value, self.model)
return str(value.id)
def to_internal_value(self, data):
try:
instance_id = uuid.UUID(data)
except ValueError:
raise serializers.ValidationError('UUID inválido')
try:
instance = self.model.objects.get(id=instance_id)
return instance
except self.model.DoesNotExist:
raise serializers.ValidationError(f'{self.model.__name__} não existe')
# Python
from unittest import mock, skip
# Third party
from django.test import TestCase
from rest_framework.response import Response
# Project
from soirtec_drf_lib.api.middlewares import ResponseCheckMiddleWare
@skip("Skiped due lack of knowledge")
class ResponseCheckMiddleWareTestCase(TestCase):
def setUp(self):
self.first_interface_layer_keys = ['data', 'message', 'page']
self.data_correct_interface = {
'data': 'test_data',
'message': 'test_message',
'page': 'test_page',
}
self.data_incorrect_interface = {'test_data': 'test_data_ok'}
# Need to find a way to make this steatement 'if isinstance(response, Response):' return True
# at line 11 of api.middlewares
self.mocked_response = Response()
self.mocked_response.render = mock.MagicMock(return_value='mocked_render')
self.mocked_get_response = mock.MagicMock(return_value=self.mocked_response)
@mock.patch('api.renderers.User.objects.get')
@mock.patch('api.renderers.api_settings.JWT_PAYLOAD_HANDLER')
@mock.patch('api.renderers.api_settings.JWT_ENCODE_HANDLER', return_value='test_token')
def test_middleware_with_correct_rendered_interface_data(self, *mocks):
self.mocked_response._is_rendered = True
self.mocked_response.data = self.data_correct_interface
middleware = ResponseCheckMiddleWare(self.mocked_get_response)
response_data = middleware('fake_request').data
self.assertEqual(response_data, self.data_correct_interface)
def test_middleware_with_incorrect_rendered_interface_data(self):
self.mocked_response._is_rendered = True
self.mocked_response.data = self.data_incorrect_interface
middleware = ResponseCheckMiddleWare(self.mocked_get_response)
response_data = middleware('fake_request').data
error_return = {
'data': None,
'message': self.data_incorrect_interface,
'page': None,
}
self.assertEqual(response_data, error_return)
def test_middleware_with_not_rendered_interface_data(self):
pass
# Python
from copy import deepcopy
from unittest import mock
import json
# Third Party
from django.test import TestCase
# Project
from soirtec_drf_lib.api.renderers import DataRenderer, DataRendererJWT
from soirtec_drf_lib.api.factories import UserFactory
class DataRendererJWTTestCase(TestCase):
def setUp(self):
self.first_interface_layer_keys = ['data', 'message', 'page']
self.data_without_pages = {'teste_data': 'teste_data_ok'}
self.data_with_pages = {
'results': {
'teste_data': 'teste_data_ok',
},
'count': 'teste_count_ok',
'next': 'teste_next_ok',
'previous': 'teste_previous_ok',
}
self.content_list = {
'results': {
'teste_data': 'teste_data_ok',
}
}
self.data_error = {'non_field_errors': ['teste_data_error']}
self.mocked_response = mock.MagicMock()
self.mocked_view = mock.MagicMock()
self.renderer_context = {
'response': self.mocked_response,
'view': self.mocked_view
}
self.user_data = {
'username': 'user0',
'email': 'user0@teste.com.br',
'password': 'user0password'
}
self.user = UserFactory(**self.user_data)
self.user.is_active = True
self.user.save()
self.data_without_pages['username'] = self.user_data.get('username')
self.data_renderer = DataRendererJWT()
def test_renderer_response_200_to_299_and_data_without_pages(self):
self.mocked_view.action = 'create'
self.mocked_view.basename = 'user'
for status_code in range(200, 299):
self.mocked_response.status_code = status_code
data = self.data_renderer.render(
deepcopy(self.data_without_pages),
renderer_context=deepcopy(self.renderer_context)
)
data = json.loads(data)
self.assertTrue(all(key in self.first_interface_layer_keys for key in data.keys()))
self.assertTrue(data.get('data').get('token'))
del(data['data']['token'])
self.assertEqual(data.get('data'), self.data_without_pages)
self.assertEqual(data.get('message'), None)
self.assertEqual(data.get('page'), None)
def test_renderer_response_200_to_299_and_data_ok_with_pages(self):
self.mocked_view.action = 'list'
for status_code in range(200, 299):
self.mocked_response.status_code = status_code
data = self.data_renderer.render(
deepcopy(self.data_with_pages),
renderer_context=deepcopy(self.renderer_context)
)
data = json.loads(data)
# Testing First layer
self.assertTrue(all(key in self.first_interface_layer_keys for key in data.keys()))
self.assertEqual(data.get('data'), self.content_list)
self.assertEqual(data.get('message'), None)
# Testing page layer
self.assertTrue(all(key in ['count', 'next', 'previous'] for key in data.get('page').keys()))
self.assertEqual(data.get('page').get('count'), self.data_with_pages.get('count'))
self.assertEqual(data.get('page').get('next'), self.data_with_pages.get('next'))
self.assertEqual(data.get('page').get('previous'), self.data_with_pages.get('previous'))
def test_renderer_response_0_to_199(self):
self.mocked_view.action = 'create'
self.mocked_view.basename = 'user'
for status_code in range(0, 199):
self.mocked_response.status_code = status_code
data = self.data_renderer.render(
deepcopy(self.data_error),
renderer_context=deepcopy(self.renderer_context)
)
data = json.loads(data)
# Testing First layer
self.assertTrue(all(key in self.first_interface_layer_keys for key in data.keys()))
self.assertEqual(data.get('message'), self.data_error)
self.assertEqual(data.get('page'), None)
self.assertEqual(data.get('data'), None)
def test_renderer_response_300_to_599(self):
self.mocked_view.action = 'create'
self.mocked_view.basename = 'user'
for status_code in range(300, 599):
self.mocked_response.status_code = status_code
data = self.data_renderer.render(
deepcopy(self.data_error),
renderer_context=deepcopy(self.renderer_context)
)
data = json.loads(data)
# Testing First layer
self.assertTrue(all(key in self.first_interface_layer_keys for key in data.keys()))
self.assertEqual(data.get('message'), self.data_error)
self.assertEqual(data.get('page'), None)
self.assertEqual(data.get('data'), None)
class DataRendererTestCase(TestCase):
def setUp(self):
self.first_interface_layer_keys = ['data', 'message', 'page']
self.data_without_pages = {'teste_data': 'teste_data_ok'}
self.data_with_pages = {
'results': {
'teste_data': 'teste_data_ok',
},
'count': 'teste_count_ok',
'next': 'teste_next_ok',
'previous': 'teste_previous_ok',
}
self.content_list = {
'results': {
'teste_data': 'teste_data_ok',
}
}
self.data_error = {'non_field_errors': ['teste_data_error']}
self.mocked_response = mock.MagicMock()
self.mocked_view = mock.MagicMock()
self.renderer_context = {
'response': self.mocked_response,
'view': self.mocked_view
}
self.data_renderer = DataRenderer()
def test_renderer_response_200_to_299_and_data_without_pages(self):
self.mocked_view.action = 'create'
for status_code in range(200, 299):
self.mocked_response.status_code = status_code
data = self.data_renderer.render(
deepcopy(self.data_without_pages),
renderer_context=deepcopy(self.renderer_context)
)
data = json.loads(data)
self.assertTrue(all(key in self.first_interface_layer_keys for key in data.keys()))
self.assertEqual(data.get('data'), self.data_without_pages)
self.assertEqual(data.get('message'), None)
self.assertEqual(data.get('page'), None)
def test_renderer_response_200_to_299_and_data_ok_with_pages(self):
self.mocked_view.action = 'list'
for status_code in range(200, 299):
self.mocked_response.status_code = status_code
data = self.data_renderer.render(
deepcopy(self.data_with_pages),
renderer_context=deepcopy(self.renderer_context)
)
data = json.loads(data)
# Testing First layer
self.assertTrue(all(key in self.first_interface_layer_keys for key in data.keys()))
self.assertEqual(data.get('data'), self.content_list)
self.assertEqual(data.get('message'), None)
# Testing page layer
self.assertTrue(all(key in ['count', 'next', 'previous'] for key in data.get('page').keys()))
self.assertEqual(data.get('page').get('count'), self.data_with_pages.get('count'))
self.assertEqual(data.get('page').get('next'), self.data_with_pages.get('next'))
self.assertEqual(data.get('page').get('previous'), self.data_with_pages.get('previous'))
def test_renderer_response_0_to_199(self):
self.mocked_view.action = 'create'
for status_code in range(0, 199):
self.mocked_response.status_code = status_code
data = self.data_renderer.render(
deepcopy(self.data_error),
renderer_context=deepcopy(self.renderer_context)
)
data = json.loads(data)
# Testing First layer
self.assertTrue(all(key in self.first_interface_layer_keys for key in data.keys()))
self.assertEqual(data.get('message'), self.data_error)
self.assertEqual(data.get('page'), None)
self.assertEqual(data.get('data'), None)
def test_renderer_response_300_to_599(self):
self.mocked_view.action = 'create'
for status_code in range(300, 599):
self.mocked_response.status_code = status_code
data = self.data_renderer.render(
deepcopy(self.data_error),
renderer_context=deepcopy(self.renderer_context)
)
data = json.loads(data)
# Testing First layer
self.assertTrue(all(key in self.first_interface_layer_keys for key in data.keys()))
self.assertEqual(data.get('message'), self.data_error)
self.assertEqual(data.get('page'), None)
self.assertEqual(data.get('data'), None)
import json
from pydoc import locate
from django.conf import settings
from django.test.runner import DiscoverRunner
from rest_framework.test import APITestCase
from rest_framework.response import Response
from rest_framework_jwt.settings import api_settings
class MyTestSuiteRunner(DiscoverRunner):
def __init__(self, *args, **kwargs):
super().__init__()
settings.TEST = True
class BaseAPITestCase(APITestCase):
def setUp(self):
self.endpoint = None
def get_path(self, id_detail=None, action=None, filter=None):
if not self.endpoint:
raise AttributeError('Endpoint não definido')
path = f'/{self.endpoint}/'
if id_detail:
path += f'{id_detail}/'
if action:
path += f'{action}/'
if filter:
path += f'?{filter}'
return path
@staticmethod
def get_content(response: Response):
return json.loads(response.content)
class BaseAPIJWTTestCase(BaseAPITestCase):
def setUp(self):
super().setUp()
user_factory = locate(settings.AUTH_USER_FACTORY)
if user_factory is None:
raise ImportError('Configure a variável "AUTH_USER_FACTORY" para utilizar no JWT')
self.user = user_factory()
self.user.set_password('test')
self.user.save()
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
payload = jwt_payload_handler(self.user)
self.token = jwt_encode_handler(payload)
self.auth = 'Bearer {}'.format(self.token)
# Third party
from rest_framework import status
# Project
from .api.test_utils import BaseAPITestCase
from .models import House
from .factories import HouseFactory
class HousesAPITestCase(BaseAPITestCase):
def setUp(self):
super().setUp()
self.endpoint = 'houses'
# CREATE
def test_create_success(self):
self.assertEqual(House.objects.count(), 0)
data = {
'name': 'Test',
'address': 'Brazil'
}
path = self.get_path()
response = self.client.post(path, data=data)
self.assertEqual(response.status_code, status.HTTP_201_CREATED, msg=response.data)
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'Test')
def test_duplicate_fail(self):
HouseFactory(name='Test', address='Brazil')
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'Test')
self.assertEqual(House.objects.first().address, 'Brazil')
data = {
'name': 'Test',
'address': 'Brazil',
}
path = self.get_path()
response = self.client.post(path, data=data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST, msg=response.data)
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'Test')
def test_create_fail(self):
self.assertEqual(House.objects.count(), 0)
data = {
'name': 'Test'
}
path = self.get_path()
response = self.client.post(path, data=data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST, msg=response.data)
self.assertEqual(House.objects.count(), 0)
# PUT
def test_put_success(self):
house = HouseFactory(name='original')
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'original')
data = {
'name': 'modificado',
'address': 'Brazil'
}
path = self.get_path(id_detail=house.id)
response = self.client.put(path, data=data)
self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data)
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'modificado')
def test_put_fail(self):
house = HouseFactory(name='original')
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'original')
data = {
'name': 'modificado'
}
path = self.get_path(id_detail=house.id)
response = self.client.put(path, data=data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST, msg=response.data)
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'original')
# PATCH
def test_patch_success(self):
house = HouseFactory(name='original')
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'original')
data = {
'name': 'modificado'
}
path = self.get_path(id_detail=house.id)
response = self.client.patch(path, data=data)
self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data)
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'modificado')
def test_patch_fail(self):
house = HouseFactory(name='original')
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'original')
data = {
'name': ''
}
path = self.get_path(id_detail=house.id)
response = self.client.patch(path, data=data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST, msg=response.data)
self.assertEqual(House.objects.count(), 1)
self.assertEqual(House.objects.first().name, 'original')
# LIST
def test_list_success(self):
HouseFactory.create_batch(3)
self.assertEqual(House.objects.count(), 3)
path = self.get_path()
response = self.client.get(path)
self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data)
self.assertEqual(len(response.data.get('results')), 3, msg=response.data)
# GET
def test_get_success(self):
house = HouseFactory(name='titulo')
self.assertEqual(House.objects.count(), 1)
path = self.get_path(id_detail=house.id)
response = self.client.get(path)
self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data)
self.assertEqual(response.data.get('name'), 'titulo')
# DELETE
def test_delete_success(self):
house = HouseFactory(name='titulo')
self.assertEqual(House.objects.count(), 1)
path = self.get_path(id_detail=house.id)
response = self.client.delete(path)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT, msg=response.data)
self.assertEqual(House.objects.count(), 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment