Skip to content

Instantly share code, notes, and snippets.

@codeboy
Created October 2, 2018 08:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codeboy/d39412419fbe94566a75e5dc96a6e77c to your computer and use it in GitHub Desktop.
Save codeboy/d39412419fbe94566a75e5dc96a6e77c to your computer and use it in GitHub Desktop.
import base64
import os
import coreapi
import coreschema
import re
import requests
from urllib.parse import urlencode
from django.contrib.auth.forms import PasswordResetForm
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.contrib.sites.shortcuts import get_current_site
from django.core.files.base import ContentFile
from django.core.validators import EmailValidator
from django.db.models import Q
from django.shortcuts import render, get_object_or_404
from django.contrib.auth import authenticate, login
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.encoding import force_text, force_bytes
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import ugettext_lazy as _
# Create your views here.
from rest_framework import permissions
from rest_framework import viewsets
from rest_framework import parsers
from rest_framework import mixins
from rest_framework.decorators import list_route, detail_route
from rest_framework.fields import SkipField
from rest_framework.filters import BaseFilterBackend
from rest_framework.generics import ListAPIView, CreateAPIView, GenericAPIView, \
UpdateAPIView, RetrieveAPIView, RetrieveUpdateAPIView
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.response import Response
from rest_framework import status
from rest_framework import serializers
from rest_framework.views import APIView
from urllib.parse import urlparse
from common import utils as common_utils
from common.vt_mail import mail_send
from . import models as core_models
class ExLimitOffsetPagination(LimitOffsetPagination):
max_limit = 20
default_limit = 10
def _paginate_cls(i_max_limit=20, i_default_limit=10):
class _LimitOffsetPagination(LimitOffsetPagination):
max_limit = i_max_limit
default_limit = i_default_limit
return _LimitOffsetPagination
class BaseFilter(BaseFilterBackend):
action_map = {}
def get_schema_fields(self, view):
return self.action_map.get(view.action, [])
class MeasureViewSet(viewsets.ReadOnlyModelViewSet):
"""
Measure - единица измерения
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = _paginate_cls(1000, 1000)
def get_queryset(self):
return core_models.Measure.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Measure
fields = ['id', 'full_title', 'short_title']
return _Serializer
class MaterialViewSet(viewsets.ReadOnlyModelViewSet):
"""
Material - Материалы
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = _paginate_cls(1000, 1000)
def get_queryset(self):
return core_models.Material.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Material
fields = ['id', 'title']
return _Serializer
class BuildingObjectViewSet(viewsets.ModelViewSet):
"""
BuildingObject - строительный объект
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
# return core_models.BuildingObject.objects.filter(user=self.request.user)
return core_models.BuildingObject.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.BuildingObject
# exclude = ['user', 'order', 'created_at', 'updated_at', 'state']
fields = ['id', 'name', 'user']
return _Serializer
class ConstructionObjectViewSet(viewsets.ModelViewSet):
"""
ConstructionObject - Наименование конструкции
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.ConstructionObject.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.ConstructionObject
fields = ['id', 'building_object', 'title']
return _Serializer
class ObjectWorkViewSet(viewsets.ModelViewSet):
"""
ObjectWork - работы на объекте
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.ObjectWork.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.ObjectWork
fields = ['id', 'name', 'status']
return _Serializer
class WorkMaterialViewSet(viewsets.ModelViewSet):
"""
WorkMaterial - материалы для работ
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.WorkMaterial.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.WorkMaterial
fields = ['id', 'work', 'material', 'planed_value']
return _Serializer
class ProducedWorkViewSet(viewsets.ModelViewSet):
"""
WorkMaterial - материалы для работ
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.ProducedWork.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.ProducedWork
fields = ['id', 'work', 'done_at', 'material', 'value']
return _Serializer
from thirdparty.cutstock.cuttingstock.GreedySolver import *
#todo: rework response for statuses and content
class CutCalculateViewSet(viewsets.GenericViewSet):
permission_classes = [permissions.IsAuthenticated]
def get_serializer_class(self):
class _Serializer(serializers.Serializer):
cut_pieces = serializers.ListField(help_text='list like - [1,2,3]')
cut_length = serializers.IntegerField()
return _Serializer
@list_route(['post'])
def cut_log(self, request, *args, **kwargs):
r_status = 'ok'
# print(request.data)
srz = self.get_serializer(data=request.data)
# srz.is_valid(raise_exception=True)
srz_valid = srz.is_valid()
cut_pieces = request.data.get('cut_pieces', False)
cut_length = request.data.get('cut_length', False)
if srz_valid and cut_pieces and cut_length:
result, count_logs = self._calculate(cut_pieces, cut_length)
r_content = {
'cut_pieces':cut_pieces,
'cut_length':cut_length,
'result': result,
'count_logs': count_logs
}
else:
r_status = 'error'
r_content = srz.errors
r_data = {'status': r_status, 'content': r_content}
return Response(r_data)
# todo: add type checking for integer
def _calculate(self, cut_pieces, cut_length):
cut_input = dict()
# перебираем кусочки в словаре и создаём другой для расчётов
for piece in cut_pieces:
quantity = int(piece['quantity'])
size = int(piece['size'])
if not cut_input.get(size, False): # проверка на уже существующий элемент todo: rework in frontend
if quantity >= 1 and size >= 1: # не ноль
cut_input[size] = quantity
cut_length = int(cut_length)
a = GreedySolver(cut_input, cut_length)
cutPatterns = a.getResult()
count_logs = len(cutPatterns)
new_cut = list()
new_count = dict()
for combination in cutPatterns:
comb = combination.size_qty
if comb in new_cut:
new_count[new_cut.index(comb)]['count'] += 1
else:
new_cut.append(comb)
new_count[new_cut.index(comb)] = {
'count': 1,
'waste': cut_length - combination.getCombinationSize(),
'combination':combination.printCombi()
}
final_cut = list()
for k,v in new_count.items():
final_cut.append({
'combination': new_cut[k],
'combination_clean': v['combination'],
'count': v['count'],
'waste': v['waste']
})
return final_cut, count_logs
###################################################################
class Base64FileField(serializers.FileField):
def to_internal_value(self, data):
if isinstance(data, str) and 'base64' in data[0:100]:
# base64 encoded image - decode
# our custom format file:1.jpg;data:
format, filestr = data.split(
';base64,') # format ~= file:;Mime type;encoding, b64code
filename = format.split(';')[0].split(':')[-1] # guess file name
data = ContentFile(base64.b64decode(filestr), name=filename)
return super(Base64FileField, self).to_internal_value(data)
class Base64ImageField(serializers.ImageField):
def to_internal_value(self, data):
if isinstance(data, str) and data.startswith('data:image'):
# base64 encoded image - decode
format, imgstr = data.split(';base64,') # format ~= data:image/X,
ext = format.split('/')[-1] # guess file extension
data = ContentFile(base64.b64decode(imgstr), name='temp.' + ext)
return super(Base64ImageField, self).to_internal_value(data)
class ExCharField(serializers.CharField):
def get_attribute(self, instance):
raise SkipField()
class ExEmailField(serializers.EmailField):
def get_attribute(self, instance):
raise SkipField()
class ExBooleanField(serializers.BooleanField):
def get_attribute(self, instance):
raise SkipField()
class BaseDictViewSet(viewsets.ReadOnlyModelViewSet):
class BaseDictBackend(BaseFilter):
action_map = {
'list': [
coreapi.Field(
name='term',
location='path',
required=False,
type='string'
)
]
}
def filter_queryset(self, request, queryset, view):
term = request.query_params.get('term')
if term:
return queryset.filter(title__icontains=term)
return queryset
permission_classes = [permissions.AllowAny]
pagination_class = _paginate_cls(1000, 1000)
filter_backends = [BaseDictBackend]
def get_serializer_class(self):
self_model = self.queryset.model
class _Serializer(serializers.ModelSerializer):
class Meta:
fields = ['id', 'title']
model = self_model
return _Serializer
class UserCreateAPIView(CreateAPIView):
permission_classes = [permissions.AllowAny]
def get_serializer_class(self):
request = self.request
class _Serializer(serializers.ModelSerializer):
password = ExCharField()
password_repeat = ExCharField()
sms = ExCharField()
is_agreement = ExBooleanField()
def validate(self, attrs):
if core_models.User.objects.filter(email=attrs['email']):
raise serializers.ValidationError(
{'email': [_('User with same email exists')]})
if core_models.User.objects.filter(phone=attrs['phone']):
raise serializers.ValidationError(
{'phone': [_('User with same phone exists')]})
if attrs['password'] != attrs['password_repeat']:
raise serializers.ValidationError({
'password': [_('Passwords not equal')],
'password_repeat': [_('Passwords not equal')]
})
if attrs['sms'] != request.session.get('sms'):
raise serializers.ValidationError({
'sms': [_('Wrong sms')]
})
return attrs
def create(self, validated_data):
user = core_models.User.objects.create(
email=validated_data['email'],
first_name=validated_data['first_name'],
last_name=validated_data['last_name'],
phone=validated_data['phone']
)
user.set_password(validated_data['password'])
user.save()
user_auth = authenticate(
username=user.email,
password=validated_data['password'],
)
login(request, user_auth)
return user
class Meta:
model = core_models.User
fields = ['first_name',
'last_name',
'middle_name',
'is_agreement',
'sms',
'email',
'phone',
'password',
'password_repeat']
write_only_fields = ['password', 'password_repeat']
read_only_fields = ['id']
return _Serializer
class ProfileUpdateAPIView(RetrieveUpdateAPIView):
# class UserUpdateAPIView(RetrieveAPIView):
permission_classes = [permissions.IsAuthenticated]
pagination_class = None
def get_object(self):
return self.request.user
#
# def get_queryset(self):
# return core_models.User.objects.filter(pk=self.request.user.pk)
def get_serializer_class(self):
data = self.request.data
required = {'required': True, 'allow_blank': False}
ekw = {'first_name': required, 'last_name': required}
user = self.request.user
if data and data['is_breeder']:
for field_name in ['address', 'office', 'education']:
ekw[field_name] = required
ekw['language'] = {'required': True, 'allow_null': False}
ekw['country'] = {'required': True, 'allow_null': False}
ekw['birth'] = {'required': True, 'allow_null': False}
if self.request.method == 'PUT' and data.get('image') and data[
'image'].startswith('http') and user.image:
data['image'] = user.image
class _Serializer(serializers.ModelSerializer):
image = Base64ImageField(allow_null=True)
parser_classes = [parsers.FormParser, parsers.MultiPartParser]
class Meta:
model = core_models.User
fields = ['first_name', 'last_name', 'middle_name',
'image',
'country', 'city', 'address', 'office', 'birth',
'about', 'language', 'education', 'course', 'link']
extra_kwargs = ekw
return _Serializer
class LoginAPIView(APIView):
permission_classes = [permissions.AllowAny]
class _Serializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField()
def validate(self, attrs):
user = core_models.User.objects.filter(Q(email=attrs['username'])
| Q(
phone=attrs['username'])).first()
err = {
'username': [_('wrong password or email or phone')],
'password': [_('wrong password or email or phone')]
}
if not user:
raise serializers.ValidationError(err)
user_auth = authenticate(
username=user.email,
password=attrs['password'],
)
if not user_auth:
raise serializers.ValidationError(err)
attrs['user_auth'] = user_auth
return attrs
def post(self, request, *args, **kwargs):
serializer = LoginAPIView._Serializer(data=request.data)
serializer.is_valid(raise_exception=True)
validated_data = serializer.validated_data
login(request, validated_data['user_auth'])
return Response({'success': True})
class AuthManagerViewSet(viewsets.GenericViewSet):
# filter_backends = [PetViewSetBackend]
email_template_name = 'apps/activation/mails/password_reset.j2',
permission_classes = [permissions.AllowAny]
# def _msg_email_get(self, request, **kwargs):
# from django.utils.http import urlsafe_base64_encode
# from django.contrib.auth.tokens import PasswordResetTokenGenerator
# from django.conf import settings
#
# current_site = get_current_site(request)
# site_name = current_site.name
# domain = current_site.domain
# context = { 'domain': domain,
# 'site_name': site_name,
# 'uid': urlsafe_base64_encode(force_bytes(self.user.pk)),
# 'user': self.user,
# 'token': PasswordResetTokenGenerator().make_token(self.user),
# 'protocol': 'https' if kwargs.get('use_https') else 'http'}
# return render_to_string(self.email_template_name, context)
#
# def _email_send(self, request):
# from django.core.mail import EmailMultiAlternatives
# email_message = EmailMultiAlternatives(_('password recovery'),
# self._msg_email_get(request), getattr(settings, 'DEFAULT_FROM_EMAIL'), [self.user.email])
# html_email = loader.render_to_string(html_email_template_name, context)
# email_message.attach_alternative(html_email, 'text/html')
#
# email_message.send()
def get_serializer_class(self):
if self.action == 'reset_password':
class _Serializer(serializers.Serializer):
username = serializers.CharField()
def validate(self, attrs):
self.is_phone = False
# EmailValidator(message=self.error_messages['invalid'])
# +7950 0376541 (11)
username = attrs['username'].strip('+')
if '@' in username:
try:
EmailValidator(message=_('invalid email'))(username)
self.user = core_models.User.objects.get(
email=username)
except Exception as e:
raise serializers.ValidationError(
{'username': [_('invalid email')]})
# if not core_models.User.objects.filter()
else:
self.is_phone = True
try:
re.match('\d{11}', username).group(0)
self.user = core_models.User.objects.get(
phone=username)
except Exception as e:
raise serializers.ValidationError(
{'username': [_('invalid phone')]})
return attrs
return _Serializer
return None
@list_route(['post'])
def reset_password(self, request, *args, **kwargs):
srz = self.get_serializer(data=request.data)
srz.is_valid(raise_exception=True)
if srz.is_phone:
# TODO: maybe add phone rcovery
pass
site = get_current_site(request)
token = PasswordResetTokenGenerator().make_token(srz.user)
url = 'http://{}{}'.format(site.domain,
reverse('activation:password_reset_confirm',
kwargs={'uidb64': urlsafe_base64_encode(
force_bytes(srz.user.pk)), 'token': token}))
html = render_to_string(self.email_template_name, {
'user': srz.user,
'url': url,
'site': site
})
try:
mail_send(srz.user.email, message_html=html,
subject=_('password reset'))
return Response({'success': True, 'email': srz.user.email})
except Exception as e:
return Response({'success': False, 'error': str(e)})
return Response({'hello': 'world'})
class CountryViewSet(BaseDictViewSet):
queryset = core_models.Country.objects.all()
class CityViewSet(viewsets.ReadOnlyModelViewSet):
queryset = core_models.City.objects.all()
permission_classes = [permissions.AllowAny]
pagination_class = _paginate_cls(1000, 1000)
def get_serializer_class(self):
self_model = self.queryset.model
class _Serializer(serializers.ModelSerializer):
class Meta:
fields = ['id', 'title']
model = self_model
return _Serializer
class CityDictBackend(BaseFilter):
action_map = {
'list': [
coreapi.Field(
name='term',
location='path',
required=False,
type='string'
),
coreapi.Field(
name='country_id',
location='path',
required=False,
type='string'
)
],
}
def filter_queryset(self, request, queryset, view):
term = request.query_params.get('term')
country_id = request.query_params.get('country_id')
if term:
queryset = queryset.filter(title__icontains=term)
if country_id:
queryset = queryset.filter(country_id=country_id)
return queryset
filter_backends = [CityDictBackend]
@list_route(['post'])
def city_set(self, request, *args, **kwargs):
city = get_object_or_404(core_models.City,
pk=request.data.get('city_id'))
if request.user.is_authenticated():
request.user.city = city
request.user.country_id = city.country_id
request.user.save()
else:
request.session['current_city_id'] = city.pk
return Response({'city': city.title, 'country': city.country.title})
class LanguageViewSet(BaseDictViewSet):
queryset = core_models.Language.objects.all()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment