Skip to content

Instantly share code, notes, and snippets.

@codeboy
Last active April 5, 2021 10:29
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codeboy/4266f9db6cf09734761eb965a273c366 to your computer and use it in GitHub Desktop.
Save codeboy/4266f9db6cf09734761eb965a273c366 to your computer and use it in GitHub Desktop.
examples

Набор примеров кода

*upd
tkinter_basic.py - https://gist.github.com/codeboy/0df005f20a9d4ba53c183bd8cbca32cd
Пример работы с tkinter gui. Вывод сообщений из внешнего api в лог текстового поля

db_init.py - https://gist.github.com/codeboy/4266f9db6cf09734761eb965a273c366#file-db_init-py
UPD Переделан пример инициализатора базы и добавления тестовых данных. Работает под Python 3.8 и Django 3.1 Добавлена интерактивность в shell. Кастомизируется и дорабатывается под проект.

drf_api.py - https://gist.github.com/codeboy/4266f9db6cf09734761eb965a273c366#file-drf_api-py
Пример DRF (Django Rest Framework) файла с воркерами, сериализаторами и валидацией

middleware.py - https://gist.github.com/codeboy/4266f9db6cf09734761eb965a273c366#file-middleware-py
Пример набора middleware для Django. Набор с разными изменениями переходит в разные проекты

proxmox_api.py - https://gist.github.com/codeboy/4266f9db6cf09734761eb965a273c366#file-proxmox_api-py
Пример работы с Proxmox Virtual Environment. Ранее использовалось на официальном сайте как пример

smsc_api.py - https://gist.github.com/codeboy/4266f9db6cf09734761eb965a273c366#file-smsc_api-py
Функционал для работы с https://smsc.ru/ Так же раньше использовалось на официальном сайте как пример

utils.py
Пример утилит так же переходящих в разные проекты с изменениями

import random
import secrets
import shutil
import sys, os
import importlib
from faker import Factory
import unipath
BASE_DIR = unipath.Path(__file__).absolute().ancestor(3)
sys.path.append(
os.path.join(os.path.dirname(__file__), 'navitest')
)
os.environ['DJANGO_SETTINGS_MODULE'] = 'navitest.settings'
import django
django.setup()
from django.utils.translation import ugettext_lazy as _
from django.core.management import call_command
from django.utils import lorem_ipsum
from django.utils.text import slugify
from django.conf import settings
from core.models import Post, CustomUser, LikeDislike
import bot_config
from django.apps import apps
USE_CONFIG = False
APP_NAME = 'core'
def init_db(is_reset=True, is_dir_clean=True):
if hasattr(settings, 'HAYSTACK_SIGNAL_PROCESSOR'):
settings.HAYSTACK_SIGNAL_PROCESSOR = \
'haystack.signals.BaseSignalProcessor'
if is_reset:
call_command('reset_db')
if is_dir_clean:
for dir2clean in ['cache', ]:
dir2clean = os.path.join(settings.BASE_DIR, 'media', dir2clean)
if os.path.exists(dir2clean):
shutil.rmtree(dir2clean)
migrate_apps = []
for app in settings.INSTALLED_APPS:
if app.startswith('apps'):
try:
dir2clean = os.path.join(settings.BASE_DIR,
app.replace('.', '/'), 'migrations')
if os.path.exists(dir2clean):
shutil.rmtree(dir2clean)
os.mkdir(dir2clean)
migrate_apps.append(app.split('.')[-1])
except Exception as e:
print('error', e)
call_command('makemigrations', *migrate_apps)
for app in migrate_apps:
module = importlib.import_module('apps.{}.migrations'.format(app))
importlib.reload(module)
call_command('migrate')
def simple_truncate(models_list=None):
print('ОЧИЩАЮТСЯ ТАБЛИЦЫ')
if models_list:
for model in models_list:
model = apps.get_model(APP_NAME, model)
model.objects.truncate()
def add_superuser():
admin = CustomUser(
username='admin',
email='admin@mm.ru',
first_name='admin first name',
last_name='admin last name',
is_active=True,
is_staff=True,
is_superuser=True,
)
admin.set_password('321')
admin.save()
print('Superuser создан.')
def add_users(users_count=3):
print('Start add users')
fake = Factory.create()
last_user = CustomUser.objects.latest('id')
users_list = list()
for i in range(users_count):
user = CustomUser(
username='user-{}'.format(last_user.pk+i),
email=fake.free_email(),
first_name=fake.first_name(),
last_name=fake.last_name(),
is_active=True,
)
user.set_password('321')
user.save()
users_list.append(user)
print('User {} created'.format(user.username))
return users_list
def add_posts(user=False, post_count=3):
print('Создаём записи...')
if not user:
users_list = CustomUser.objects.filter(is_superuser=False)
posts_list = list()
for i in range(post_count):
post = Post(
title = lorem_ipsum.sentence(),
content = lorem_ipsum.paragraph(),
)
post.author = user if user else secrets.choice(users_list)
post.save()
posts_list.append(post)
print('Post {} created'.format(post.id))
return posts_list
def echo_explain():
print(
'''Выбирете вариант
1 - Начать сброс БД
2 - Очистка таблиц
3 - Добавить суперпользователя
4 - Добавить пользователей
5 - Добавить записи
0 - Отмена \ Выход''')
def console():
while True:
try:
echo_explain()
input_num = int(input("Enter 1-7 or 0: " ))
if input_num == 1: init_warning()
if input_num == 2: simple_truncate(models_list=('Post', 'LikeDislike'))
if input_num == 3: add_superuser()
if input_num == 4: add_users()
if input_num == 5: add_posts()
if input_num == 0:
break
except ValueError:
print("Error! This is not a number. Try again.")
else:
print("....")
if __name__ == '__main__':
# init()
console()
import base64
import os
import coreapi
import coreschema
import re
import requests
from urllib.parse import urlencode
from django.contrib.auth.forms import PasswordResetForm
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.contrib.sites.shortcuts import get_current_site
from django.core.files.base import ContentFile
from django.core.validators import EmailValidator
from django.db.models import Q
from django.shortcuts import render, get_object_or_404
from django.contrib.auth import authenticate, login
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.encoding import force_text, force_bytes
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import ugettext_lazy as _
from rest_framework import permissions
from rest_framework import viewsets
from rest_framework import parsers
from rest_framework import mixins
from rest_framework.decorators import list_route, detail_route
from rest_framework.fields import SkipField
from rest_framework.filters import BaseFilterBackend
from rest_framework.generics import ListAPIView, CreateAPIView, GenericAPIView, \
UpdateAPIView, RetrieveAPIView, RetrieveUpdateAPIView
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.response import Response
from rest_framework import status
from rest_framework import serializers
from rest_framework.views import APIView
from urllib.parse import urlparse
from common import utils as common_utils
from common.vt_mail import mail_send
from . import models as core_models
class ExLimitOffsetPagination(LimitOffsetPagination):
max_limit = 20
default_limit = 10
def _paginate_cls(i_max_limit=20, i_default_limit=10):
class _LimitOffsetPagination(LimitOffsetPagination):
max_limit = i_max_limit
default_limit = i_default_limit
return _LimitOffsetPagination
class BaseFilter(BaseFilterBackend):
action_map = {}
def get_schema_fields(self, view):
return self.action_map.get(view.action, [])
class PetViewSetBackend(BaseFilter):
action_map = {
'list': [
coreapi.Field(
name='pet',
location='path',
required=True,
type='string'
)
]
}
def filter_queryset(self, request, queryset, view):
return queryset.filter(pet_id=request.query_params.get('pet'))
class BaseExViewSet(viewsets.ViewSet):
# filter_backends = [PetViewSetBackend]
permission_classes = [permissions.IsAuthenticated]
serializer_cls = False
def list(self, request, *args, **kwargs):
cls = self.serializer_cls
model = cls.Meta.model
qset = model.objects.filter(
user=request.user, pet_id=kwargs['pet_id'])
return Response({'results': cls(qset, many=True).data})
@list_route(['post'])
def list_create_update(self, request, *args, **kwargs):
pet = get_object_or_404(core_models.Pet,
pk=kwargs.get('pet_id'),
user=request.user)
ids = {}
cls = self.serializer_cls
model = cls.Meta.model
queryset = model.objects.filter(
user=request.user, pet=pet)
instances = common_utils.qset_to_dict(queryset)
for item in request.data:
if 'id' in item:
ids[item['id']] = item['id']
if len(ids) or not len(request.data):
queryset.exclude(id__in=ids.keys()).delete()
for item in request.data:
instance = instances.get(item.get('id', None), None)
file = item.get('file', None)
if instance and hasattr(instance, 'file') and instance.file:
if not file or file.startswith('/media'):
item['file'] = instance.file
srz = cls(instance=instance, data=item)
srz.is_valid(raise_exception=True)
srz.save(user=request.user, pet_id=pet.pk)
return self.list(request=request, pet_id=pet.pk)
class PetViewSet(viewsets.ModelViewSet):
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.Pet.objects.filter(user=self.request.user)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Pet
# TODO: add fields
exclude = ['user', 'order', 'created_at', 'updated_at', 'state']
return _Serializer
class Base64FileField(serializers.FileField):
def to_internal_value(self, data):
if isinstance(data, str) and 'base64' in data[0:100]:
# base64 encoded image - decode
# our custom format file:1.jpg;data:
format, filestr = data.split(
';base64,') # format ~= file:;Mime type;encoding, b64code
filename = format.split(';')[0].split(':')[-1] # guess file name
data = ContentFile(base64.b64decode(filestr), name=filename)
return super(Base64FileField, self).to_internal_value(data)
class Base64ImageField(serializers.ImageField):
def to_internal_value(self, data):
if isinstance(data, str) and data.startswith('data:image'):
# base64 encoded image - decode
format, imgstr = data.split(';base64,') # format ~= data:image/X,
ext = format.split('/')[-1] # guess file extension
data = ContentFile(base64.b64decode(imgstr), name='temp.' + ext)
return super(Base64ImageField, self).to_internal_value(data)
class PetImageViewSet(viewsets.ModelViewSet):
"""
Image
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = _paginate_cls(30, 30)
def filter_queryset(self, queryset):
if self.action == 'list':
return super(PetImageViewSet, self).filter_queryset(queryset)
return queryset
@list_route(['get', 'post'])
def list_create_update(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
instances = common_utils.qset_to_dict(queryset)
ids = {}
for item in request.data['items']:
if 'id' in item:
ids[item['id']] = item['id']
if len(ids) or not len(request.data['items']):
queryset.exclude(id__in=ids.keys()).delete()
for item in request.data['items']:
instance = instances.get(item.get('id', None), None)
if instance and not item['image'] or item['image'].startswith(
'http'):
item['image'] = instance.image if instance.image else None
serializer = self.get_serializer(instance=instance,
data=item)
serializer.is_valid(raise_exception=True)
serializer.save()
return self.list(request, *args, **kwargs)
def get_queryset(self):
return core_models.PetImage.objects.filter(
user=self.request.user)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
image = Base64ImageField()
parser_classes = [parsers.FormParser, parsers.MultiPartParser]
class Meta:
model = core_models.PetImage
fields = ['id', 'pet', 'image', 'name', 'created', 'is_cover']
return _Serializer
##### HEALTH ########
class PetWeightViewSet(viewsets.ModelViewSet):
"""
Health - weight
retrieve:
Health - return the given weight.
list:
Health - return a list of all the existing weights.
create:
Health - create a new weight instance.
delete:
Health - delete weight
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.PetWeight.objects.filter(
user=self.request.user,
)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetWeight
exclude = ['order', 'created_at', 'updated_at', 'state']
return _Serializer
class PetHealthSerializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetHealth
fields = ['id', 'is_castrated', 'is_flea_safe', 'blood_type']
class PetWeightSerializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetWeight
fields = ['id', 'weight', 'weight_date']
class PetVaccSerializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetVacc
fields = ['id', 'vacc', 'vacc_date', 'fio', 'company']
class PetChipSerializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetChip
fields = ['id', 'chip', 'chip_date', 'fio', 'clinic']
class ExHealthViewSet(viewsets.ViewSet):
# filter_backends = [PetViewSetBackend]
permission_classes = [permissions.IsAuthenticated]
def list(self, request, *args, **kwargs):
health = core_models.PetHealth.objects.filter(
user=request.user,
pet_id=kwargs.get('pet_id')).first()
if health:
health_srz = PetHealthSerializer(health)
dc_response = {'pethealth': health_srz.data}
else:
dc_response = {'pethealth': {}}
for cls in [PetWeightSerializer, PetAntiHelmSerializer,
PetVaccSerializer, PetChipSerializer]:
model = cls.Meta.model
qset = model.objects.filter(user=request.user)
# return queryset.filter(pet_id=request.query_params.get('pet'))
if request.query_params.get('pet'):
qset = qset.filter(pet_id=request.query_params.get('pet'))
dc_response[model._meta.model_name] = cls(
qset,
many=True).data
return Response({'results': dc_response}) # serialazer
@list_route(['post'])
def list_create_update(self, request, *args, **kwargs):
data = request.data
pet = get_object_or_404(core_models.Pet,
pk=kwargs.get('pet_id'),
user=request.user)
pethealth = core_models.PetHealth.objects.filter(
user=request.user,
pet=pet).first()
srz = PetHealthSerializer(instance=pethealth,
data=data['pethealth'])
srz.is_valid(raise_exception=True)
srz.save(user=request.user, pet_id=pet.pk)
for cls in [PetWeightSerializer, PetAntiHelmSerializer,
PetVaccSerializer, PetChipSerializer]:
ids = {}
model = cls.Meta.model
queryset = model.objects.filter(user=request.user, pet=pet)
instances = common_utils.qset_to_dict(queryset)
model_name = model._meta.model_name
for item in request.data[model_name]:
if 'id' in item:
ids[item['id']] = item['id']
if len(ids) or not len(request.data[model_name]):
queryset.exclude(id__in=ids.keys()).delete()
for item in request.data[model_name]:
instance = instances.get(item.get('id', None), None)
srz = cls(instance=instance, data=item)
srz.is_valid(raise_exception=True)
srz.save(user=request.user, pet_id=pet.pk)
return self.list(request=request, pet_id=pet.pk)
class PetAntiHelmViewSet(viewsets.ModelViewSet):
"""
Health - antihelm
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.PetAntiHelm.objects.filter(
user=self.request.user,
)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetAntiHelm
exclude = ['order', 'created_at', 'updated_at', 'state']
return _Serializer
class HelmViewSet(viewsets.ReadOnlyModelViewSet):
"""
Health - antihelm preparate
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = _paginate_cls(1000, 1000)
def get_queryset(self):
return core_models.Helm.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Helm
fields = ['id', 'title']
return _Serializer
class VaccViewSet(viewsets.ReadOnlyModelViewSet):
"""
Health - Vaccine preparate
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = _paginate_cls(1000, 1000)
def get_queryset(self):
return core_models.Vacc.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Vacc
fields = ['id', 'title']
return _Serializer
class PetVaccViewSet(viewsets.ModelViewSet):
"""
Health - Vaccine
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.PetVacc.objects.filter(
user=self.request.user,
)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetVacc
exclude = ['order', 'created_at', 'updated_at', 'state']
return _Serializer
class PetChipViewSet(viewsets.ReadOnlyModelViewSet):
"""
Health - Chip
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = _paginate_cls(1000, 1000)
def get_queryset(self):
return core_models.PetChip.objects.filter(
user=self.request.user,
)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetChip
exclude = ['order', 'created_at', 'updated_at', 'state']
return _Serializer
##### FEED ####
class FeedBrandViewSet(viewsets.ReadOnlyModelViewSet):
"""
Feed - brand dry
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.FeedBrand.objects.filter(is_wet=False)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.FeedBrand
fields = ['id', 'title', 'is_wet']
return _Serializer
class FeedBrandWetViewSet(viewsets.ReadOnlyModelViewSet):
"""
Feed - brand wet
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.FeedBrand.objects.filter(is_wet=True)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.FeedBrand
fields = ['id', 'title', 'is_wet']
return _Serializer
class FillerBrandViewSet(viewsets.ReadOnlyModelViewSet):
"""
Feed - filler brand
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.FillerBrand.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.FillerBrand
fields = ['id', 'title']
return _Serializer
# DEPRECATED
class PetFeedViewSet(
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet):
"""
Feed
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.Pet.objects.filter(
user=self.request.user,
)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Pet
fields = ['id',
'feed_brand_dry',
'feed_brand_wet',
'extra_feed',
'is_tray',
'is_filler',
'filler_brand']
return _Serializer
###### Exhibition ####
class ExhibitionViewSet(viewsets.ModelViewSet):
"""
Exhibition
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.Exhibition.objects.filter(
user=self.request.user,
).order_by('id')
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Exhibition
exclude = ['order', 'created_at', 'updated_at', 'state']
return _Serializer
class ExhibitionSerializer(serializers.ModelSerializer):
file = Base64FileField(required=False,
allow_null=True)
filename = serializers.SerializerMethodField(required=False,
allow_null=True)
def get_filename(self, obj):
if obj.file:
return os.path.basename(obj.file.name)
return ''
parser_classes = [parsers.FormParser, parsers.MultiPartParser]
class Meta:
model = core_models.Exhibition
fields = ['id', 'date_from', 'date_to', 'association', 'club',
'ex_class',
'ex_title', 'city', 'fio',
'file', 'filename']
class ExExhibitionViewSet(BaseExViewSet):
serializer_cls = ExhibitionSerializer
class PetRankSerializer(serializers.ModelSerializer):
file = Base64FileField(required=False,
allow_null=True)
filename = serializers.SerializerMethodField(required=False,
allow_null=True)
def get_filename(self, obj):
if obj.file:
return os.path.basename(obj.file.name)
return ''
parser_classes = [parsers.FormParser, parsers.MultiPartParser]
class Meta:
model = core_models.PetRank
fields = ['id', 'association', 'rank', 'file', 'filename']
# fields = ['id', 'association', 'rank']
class PetRankViewSet(viewsets.ModelViewSet):
"""
Pet Rank
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
serializer_class = PetRankSerializer
def get_queryset(self):
return core_models.PetRank.objects.filter(
user=self.request.user,
pet_id=self.kwargs['pet_id']
)
class ExPetRankViewSet(BaseExViewSet):
"""
Pet Rank - list C&U
"""
serializer_cls = PetRankSerializer
class AssociationViewSet(viewsets.ReadOnlyModelViewSet):
"""
Exhibition - association
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.Association.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Association
fields = ['id', 'title']
return _Serializer
class RankViewSet(viewsets.ReadOnlyModelViewSet):
"""
Exhibition - rank
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.Rank.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Rank
fields = ['id', 'title']
return _Serializer
#### bloodline ###
class ClubViewSet(viewsets.ModelViewSet):
"""
Bloodline - Club
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.Club.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Club
fields = ['id', 'title']
return _Serializer
class MetriсViewSet(viewsets.ModelViewSet):
"""
Bloodline - Metric
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
filter_backends = [PetViewSetBackend]
def get_queryset(self):
return core_models.PetMetric.objects.filter(
user=self.request.user,
)
def filter_queryset(self, queryset):
if self.action == 'list':
return super(MetriсViewSet, self).filter_queryset(queryset)
return queryset
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.PetMetric
exclude = ['order', 'created_at', 'updated_at', 'state']
return _Serializer
class MetriсSerializer(serializers.ModelSerializer):
file = Base64FileField(required=False,
allow_null=True)
filename = serializers.SerializerMethodField(required=False,
allow_null=True)
def get_filename(self, obj):
if obj.file:
return os.path.basename(obj.file.name)
return ''
parser_classes = [parsers.FormParser, parsers.MultiPartParser]
class Meta:
model = core_models.PetMetric
fields = ['id', 'metric_number', 'club', 'club_system',
'file', 'filename'
]
class BloodlineSerializer(serializers.ModelSerializer):
file = Base64FileField(required=False,
allow_null=True)
filename = serializers.SerializerMethodField(required=False,
allow_null=True)
def get_filename(self, obj):
if obj.file:
return os.path.basename(obj.file.name)
return ''
parser_classes = [parsers.FormParser, parsers.MultiPartParser]
class Meta:
model = core_models.PetBloodline
fields = ['id', 'metric_number', 'club', 'club_system',
'file', 'filename'
]
class PetParentSerializer(serializers.ModelSerializer):
file = Base64FileField(required=False,
allow_null=True)
filename = serializers.SerializerMethodField(required=False,
allow_null=True)
def get_filename(self, obj):
if obj.file:
return os.path.basename(obj.file.name)
return ''
class Meta:
model = core_models.PetParent
fields = ['id', 'pet', 'pet_parent', 'gender', 'breed',
'nickname', 'breeder_fio', 'owner_fio', 'color', 'rank',
'file', 'filename']
class ExBloodlineViewSet(viewsets.ViewSet):
filter_backends = [PetViewSetBackend]
permission_classes = [permissions.IsAuthenticated]
srzs_list = [BloodlineSerializer, MetriсSerializer,
# PetParentSerializer # don't need it any more
]
def list(self, request, *args, **kwargs):
dc_response = {}
for cls in self.srzs_list:
model = cls.Meta.model
qset = model.objects.filter(user=request.user)
if kwargs.get('pet_id'):
qset = qset.filter(
pet_id=kwargs.get('pet_id')).order_by('id')
dc_response[model._meta.model_name] = cls(
qset, many=True).data
q_parents = core_models.PetParent.objects.filter(
user=request.user, pet_id=kwargs.get('pet_id'))
try:
dc_response['mother'] = PetParentSerializer(
q_parents.get(gender=2)).data
except core_models.PetParent.DoesNotExist:
pass
try:
dc_response['father'] = PetParentSerializer(
q_parents.get(gender=3)).data
except core_models.PetParent.DoesNotExist:
pass
return Response({'results': dc_response})
@list_route(['post'])
def list_create_update(self, request, *args, **kwargs):
data = request.data
pet = get_object_or_404(core_models.Pet,
pk=kwargs.get('pet_id'),
user=request.user)
for cls in self.srzs_list:
ids = {}
model = cls.Meta.model
queryset = model.objects.filter(user=request.user, pet=pet)
instances = common_utils.qset_to_dict(queryset)
model_name = model._meta.model_name
for item in request.data[model_name]:
if 'id' in item:
ids[item['id']] = item['id']
if len(ids) or not len(request.data[model_name]):
queryset.exclude(id__in=ids.keys()).delete()
for item in request.data[model_name]:
instance = instances.get(item.get('id', None), None)
file = item.get('file', None)
if instance and hasattr(instance, 'file') and instance.file:
if not file or file.startswith('/media'):
item['file'] = instance.file
srz = cls(instance=instance, data=item)
srz.is_valid(raise_exception=True)
srz.save(user=request.user, pet_id=pet.pk)
mother = request.data['mother']
if len(mother) >= 1:
queryset = core_models.PetParent.objects.filter(
user=request.user, pet=pet)
instances = common_utils.qset_to_dict(queryset)
instance = instances.get(mother.get('id', None), None)
file = mother.get('file', None)
if instance and hasattr(instance, 'file') and instance.file:
if not file or file.startswith('/media'):
mother['file'] = instance.file
srz = PetParentSerializer(instance=instance, data=mother)
srz.is_valid(raise_exception=True)
srz.save(user=request.user, pet_id=pet.pk)
father = request.data['father']
if len(father) >= 1:
queryset = core_models.PetParent.objects.filter(
user=request.user, pet=pet)
instances = common_utils.qset_to_dict(queryset)
instance = instances.get(father.get('id', None), None)
file = father.get('file', None)
if instance and hasattr(instance, 'file') and instance.file:
if not file or file.startswith('/media'):
father['file'] = instance.file
srz = PetParentSerializer(instance=instance, data=father)
srz.is_valid(raise_exception=True)
srz.save(user=request.user, pet_id=pet.pk)
return self.list(request=request, pet_id=pet.pk)
class BreedViewSet(viewsets.ModelViewSet):
"""
Bloodline - Breed
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.Breed.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Breed
fields = ['id', 'title']
return _Serializer
##### SELL ####
class CurrencyViewSet(viewsets.ReadOnlyModelViewSet):
"""
Currency
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.Currency.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Currency
fields = ['id', 'title', 'rate']
return _Serializer
class SellTypeViewSet(viewsets.ReadOnlyModelViewSet):
"""
Sell Type
"""
permission_classes = [permissions.IsAuthenticated]
pagination_class = ExLimitOffsetPagination
def get_queryset(self):
return core_models.SellType.objects.all()
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.SellType
fields = ['id', 'title']
return _Serializer
class ExBreedViewSet(viewsets.ViewSet):
pagination_class = _paginate_cls(1000, 1000)
def _response_get(self, md):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = md
fields = ['id', 'title']
srz = _Serializer(md.objects.all(), many=True)
return Response({'results': srz.data})
def list(self, request, *args, **kwargs):
return self._response_get(core_models.Breed)
@list_route(['GET'])
def search(self, request, *args, **kwargs):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.Breed
fields = ['id', 'title']
srz = _Serializer(core_models.Breed.objects.filter(
title__icontains=request.query_params.get('term', ''))[0:40],
many=True)
# return Response({'results': srz.data})
return Response({'results': srz.data})
def retrieve(self, request, pk=None):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.ColorData
fields = ['id', 'title']
breed = get_object_or_404(core_models.Breed, pk=pk)
return Response(_Serializer(breed).data)
class ColorViewSet(viewsets.ViewSet):
pagination_class = _paginate_cls(1000, 1000)
def _response_get(self, md):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = md
fields = ['id', 'title']
srz = _Serializer(md.objects.all(), many=True)
return Response({'results': srz.data})
@list_route(['GET'])
def list_color_main(self, request, *args, **kwargs):
return self._response_get(core_models.ColorMain)
@list_route(['GET'])
def list_color_eye(self, request, *args, **kwargs):
return self._response_get(core_models.ColorEye)
@list_route(['GET'])
def list_color_image(self, request, *args, **kwargs):
return self._response_get(core_models.ColorImage)
@list_route(['GET'])
def list_color_white_image(self, request, *args, **kwargs):
return self._response_get(core_models.ColorWhiteImage)
@list_route(['GET'])
def list_color_spot(self, request, *args, **kwargs):
return self._response_get(core_models.ColorSpot)
@list_route(['GET'])
def list_color_point(self, request, *args, **kwargs):
return self._response_get(core_models.ColorPoint)
def retrieve(self, request, pk=None):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.ColorData
fields = ['id', 'title', 'color_main', 'color_image',
'color_white_image', 'color_point',
'color_gold_silver']
color_data = get_object_or_404(core_models.ColorData, pk=pk)
return Response(_Serializer(color_data).data)
@list_route(['GET'])
def sub_color_search(self, request, *args, **kwargs):
query = {}
for field in ['color_main', 'color_image', 'color_white_image',
'color_point', 'color_gold_silver']:
field_name = '{}_id'.format(field)
query[field_name] = request.query_params.get(field, '')
if not query[field_name]:
query[field_name] = None
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.ColorData
fields = ['id', 'title', 'color_main', 'color_image',
'color_white_image', 'color_point',
'color_gold_silver']
item = core_models.ColorData.objects.filter(**query).first()
if item:
return Response(_Serializer(item).data)
return Response(status=status.HTTP_204_NO_CONTENT)
@list_route(['GET'])
def color_search(self, request, *args, **kwargs):
class _Serializer(serializers.ModelSerializer):
class Meta:
model = core_models.ColorData
fields = ['id', 'title']
srz = _Serializer(core_models.ColorData.objects.filter(
title__icontains=request.query_params.get('term', ''))[0:40],
many=True)
# return Response({'results': srz.data})
return Response({'results': srz.data})
class ExCharField(serializers.CharField):
def get_attribute(self, instance):
raise SkipField()
class ExEmailField(serializers.EmailField):
def get_attribute(self, instance):
raise SkipField()
class ExBooleanField(serializers.BooleanField):
def get_attribute(self, instance):
raise SkipField()
class UserCreateAPIView(CreateAPIView):
permission_classes = [permissions.AllowAny]
def get_serializer_class(self):
request = self.request
class _Serializer(serializers.ModelSerializer):
password = ExCharField()
password_repeat = ExCharField()
sms = ExCharField()
is_agreement = ExBooleanField()
def validate(self, attrs):
if core_models.User.objects.filter(email=attrs['email']):
raise serializers.ValidationError(
{'email': [_('User with same email exists')]})
if core_models.User.objects.filter(phone=attrs['phone']):
raise serializers.ValidationError(
{'phone': [_('User with same phone exists')]})
if attrs['password'] != attrs['password_repeat']:
raise serializers.ValidationError({
'password': [_('Passwords not equal')],
'password_repeat': [_('Passwords not equal')]
})
if attrs['sms'] != request.session.get('sms'):
raise serializers.ValidationError({
'sms': [_('Wrong sms')]
})
return attrs
def create(self, validated_data):
user = core_models.User.objects.create(
email=validated_data['email'],
first_name=validated_data['first_name'],
last_name=validated_data['last_name'],
phone=validated_data['phone']
)
user.set_password(validated_data['password'])
user.save()
user_auth = authenticate(
username=user.email,
password=validated_data['password'],
)
login(request, user_auth)
return user
class Meta:
model = core_models.User
fields = ['first_name',
'last_name',
'middle_name',
'is_agreement',
'sms',
'email',
'phone',
'password',
'password_repeat']
write_only_fields = ['password', 'password_repeat']
read_only_fields = ['id']
return _Serializer
class ProfileUpdateAPIView(RetrieveUpdateAPIView):
permission_classes = [permissions.IsAuthenticated]
pagination_class = None
def get_object(self):
return self.request.user
def get_serializer_class(self):
data = self.request.data
required = {'required': True, 'allow_blank': False}
ekw = {'first_name': required, 'last_name': required}
user = self.request.user
if data and data['is_breeder']:
for field_name in ['address', 'office', 'education']:
ekw[field_name] = required
ekw['language'] = {'required': True, 'allow_null': False}
ekw['country'] = {'required': True, 'allow_null': False}
ekw['birth'] = {'required': True, 'allow_null': False}
if self.request.method == 'PUT' and data.get('image') and data[
'image'].startswith('http') and user.image:
data['image'] = user.image
class _Serializer(serializers.ModelSerializer):
image = Base64ImageField(allow_null=True)
parser_classes = [parsers.FormParser, parsers.MultiPartParser]
class Meta:
model = core_models.User
fields = ['first_name', 'last_name', 'middle_name',
'image',
'is_breeder',
'country', 'city', 'address', 'office', 'birth',
'about', 'language', 'education', 'course', 'link']
extra_kwargs = ekw
return _Serializer
class SmsAPIView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request, *args, **kwargs):
from django.utils.crypto import get_random_string
from django.conf import settings
from apps.core.smsc_api import SMSC
if getattr(settings, 'SKIP_SMS'):
request.session['sms'] = '123'
return Response({'success': True})
try:
phone = re.match('\d{11}', request.data.get('phone', '')).group(0)
code = get_random_string(length=4, allowed_chars='1234567890')
smsc = SMSC()
smsc.send_sms(phone, 'Onpet.me Код: {}'.format(code))
request.session['sms'] = code
return Response({'success': True})
except Exception as e:
return Response({'phone': [_('invalid phone number')]},
status=status.HTTP_400_BAD_REQUEST)
class BaseDictViewSet(viewsets.ReadOnlyModelViewSet):
class BaseDictBackend(BaseFilter):
action_map = {
'list': [
coreapi.Field(
name='term',
location='path',
required=False,
type='string'
)
]
}
def filter_queryset(self, request, queryset, view):
term = request.query_params.get('term')
if term:
return queryset.filter(title__icontains=term)
return queryset
permission_classes = [permissions.AllowAny]
pagination_class = _paginate_cls(1000, 1000)
filter_backends = [BaseDictBackend]
def get_serializer_class(self):
self_model = self.queryset.model
class _Serializer(serializers.ModelSerializer):
class Meta:
fields = ['id', 'title']
model = self_model
return _Serializer
class CountryViewSet(BaseDictViewSet):
queryset = core_models.Country.objects.all()
class CityViewSet(viewsets.ReadOnlyModelViewSet):
queryset = core_models.City.objects.all()
permission_classes = [permissions.AllowAny]
pagination_class = _paginate_cls(1000, 1000)
def get_serializer_class(self):
self_model = self.queryset.model
class _Serializer(serializers.ModelSerializer):
class Meta:
fields = ['id', 'title']
model = self_model
return _Serializer
class CityDictBackend(BaseFilter):
action_map = {
'list': [
coreapi.Field(
name='term',
location='path',
required=False,
type='string'
),
coreapi.Field(
name='country_id',
location='path',
required=False,
type='string'
)
],
}
def filter_queryset(self, request, queryset, view):
term = request.query_params.get('term')
country_id = request.query_params.get('country_id')
if term:
queryset = queryset.filter(title__icontains=term)
if country_id:
queryset = queryset.filter(country_id=country_id)
return queryset
filter_backends = [CityDictBackend]
@list_route(['post'])
def city_set(self, request, *args, **kwargs):
city = get_object_or_404(core_models.City,
pk=request.data.get('city_id'))
if request.user.is_authenticated():
request.user.city = city
request.user.country_id = city.country_id
request.user.save()
else:
request.session['current_city_id'] = city.pk
return Response({'city': city.title, 'country': city.country.title})
class LanguageViewSet(BaseDictViewSet):
queryset = core_models.Language.objects.all()
class NurseryViewSet(viewsets.ModelViewSet):
permission_classes = [permissions.AllowAny]
pagination_class = None
def get_queryset(self):
return core_models.Nursery.objects.filter(user=self.request.user)
def perform_create(self, serializer):
# TODO: remove with list edit
core_models.Nursery.objects.filter(user=self.request.user).delete()
serializer.save(user=self.request.user)
def update(self, request, *args, **kwargs):
instance = self.get_object()
if not request.data['cert'] or 'media/' in request.data['cert']:
request.data['cert'] = instance.cert if instance.cert else None
if not request.data['diploma'] or 'media/' in request.data['diploma']:
request.data[
'diploma'] = instance.diploma if instance.diploma else None
return super(NurseryViewSet, self).update(request, *args, **kwargs)
def get_serializer_class(self):
class _Serializer(serializers.ModelSerializer):
cert = Base64FileField()
diploma = Base64FileField(allow_null=True)
parser_classes = [parsers.FormParser, parsers.MultiPartParser]
cert_filename = serializers.SerializerMethodField(required=False,
allow_null=True)
diploma_filename = serializers.SerializerMethodField(required=False,
allow_null=True)
def get_cert_filename(self, obj):
if obj.cert:
return os.path.basename(obj.cert.name)
return ''
def is_valid(self, raise_exception=False):
return super(_Serializer, self).is_valid(
raise_exception=raise_exception)
def get_diploma_filename(self, obj):
if obj.diploma:
return os.path.basename(obj.diploma.name)
return ''
class Meta:
fields = ['id', 'title', 'register_at', 'association', 'club',
'link', 'cert', 'diploma', 'cert_filename',
'diploma_filename']
model = core_models.Nursery
extra_kwargs = {
'register_at': {'allow_null': True}}
return _Serializer
class LoginAPIView(APIView):
permission_classes = [permissions.AllowAny]
class _Serializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField()
def validate(self, attrs):
user = core_models.User.objects.filter(Q(email=attrs['username'])
| Q(
phone=attrs['username'])).first()
err = {
'username': [_('wrong password or email or phone')],
'password': [_('wrong password or email or phone')]
}
if not user:
raise serializers.ValidationError(err)
user_auth = authenticate(
username=user.email,
password=attrs['password'],
)
if not user_auth:
raise serializers.ValidationError(err)
attrs['user_auth'] = user_auth
return attrs
def post(self, request, *args, **kwargs):
serializer = LoginAPIView._Serializer(data=request.data)
serializer.is_valid(raise_exception=True)
validated_data = serializer.validated_data
login(request, validated_data['user_auth'])
return Response({'success': True})
class AuthManagerViewSet(viewsets.GenericViewSet):
# filter_backends = [PetViewSetBackend]
email_template_name = 'apps/activation/mails/password_reset.j2',
permission_classes = [permissions.AllowAny]
def get_serializer_class(self):
if self.action == 'reset_password':
class _Serializer(serializers.Serializer):
username = serializers.CharField()
def validate(self, attrs):
self.is_phone = False
# EmailValidator(message=self.error_messages['invalid'])
# +7950 0376541 (11)
username = attrs['username'].strip('+')
if '@' in username:
try:
EmailValidator(message=_('invalid email'))(username)
self.user = core_models.User.objects.get(
email=username)
except Exception as e:
raise serializers.ValidationError(
{'username': [_('invalid email')]})
else:
self.is_phone = True
try:
re.match('\d{11}', username).group(0)
self.user = core_models.User.objects.get(
phone=username)
except Exception as e:
raise serializers.ValidationError(
{'username': [_('invalid phone')]})
return attrs
return _Serializer
return None
@list_route(['post'])
def reset_password(self, request, *args, **kwargs):
srz = self.get_serializer(data=request.data)
srz.is_valid(raise_exception=True)
if srz.is_phone:
# TODO: maybe add phone rcovery
pass
site = get_current_site(request)
token = PasswordResetTokenGenerator().make_token(srz.user)
url = 'http://{}{}'.format(site.domain,
reverse('activation:password_reset_confirm',
kwargs={'uidb64': urlsafe_base64_encode(
force_bytes(srz.user.pk)), 'token': token}))
html = render_to_string(self.email_template_name, {
'user': srz.user,
'url': url,
'site': site
})
try:
mail_send(srz.user.email, message_html=html,
subject=_('password reset'))
return Response({'success': True, 'email': srz.user.email})
except Exception as e:
return Response({'success': False, 'error': str(e)})
return Response({'hello': 'world'})
import urllib
import re
from django import http
from django.urls import resolve
from django.shortcuts import render
from django.utils.deprecation import MiddlewareMixin
from django.middleware.locale import LocaleMiddleware
from django.utils import translation
from django.conf import settings
from six import iteritems
from common.thread_locals import get_current_site
from .thread_locals import set_thread_var
from .cachedtree import CacheTree
class MiddlewareInfoView(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_view(self, request, view_func, view_args, view_kwargs):
try:
resolver = resolve(request.path)
request.vt_view = {
'resolver': resolver,
'view_func_name': view_func.__name__,
'path': request.path,
'namespace': resolver.namespace,
'view_name': resolver.view_name,
'short_view_name': resolver.view_name.split(':')[-1],
'view_func': view_func,
'view_args': view_args,
'view_kwargs': view_kwargs,
}
except BaseException as e:
request.vt_view = {}
class MiddlewareMultipleProxy(MiddlewareMixin):
FORWARDED_FOR_FIELDS = [
'HTTP_X_FORWARDED_FOR',
'HTTP_X_FORWARDED_HOST',
'HTTP_X_FORWARDED_SERVER',
]
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
"""
Rewrites the proxy headers so that only the most
recent proxy is used.
"""
for field in self.FORWARDED_FOR_FIELDS:
if field in request.META:
if ',' in request.META[field]:
parts = request.META[field].split(',')
request.META[field] = parts[-1].strip()
class MiddlewareThreadLocal(MiddlewareMixin):
""" Simple middleware that adds the request object in thread local storage."""
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
set_thread_var('request', request)
class MiddlewareAjaxCSRFDisable(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
setattr(request, '_dont_enforce_csrf_checks', True)
class MiddlewareFilterPersist(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
if '/admin/' not in request.path:
return None
if not request.META.has_key('HTTP_REFERER'):
return None
popup = 'pop=1' in request.META['QUERY_STRING']
path = request.path
query_string = request.META['QUERY_STRING']
session = request.session
if session.get('redirected',
False): # so that we dont loop once redirected
del session['redirected']
return None
referrer = request.META['HTTP_REFERER'].split('?')[0]
referrer = referrer[referrer.find('/admin'):len(referrer)]
key = 'key' + path.replace('/', '_')
if popup:
key = 'popup' + path.replace('/', '_')
if path == referrer: # We are in same page as before
if query_string == '': # Filter is empty, delete it
if session.get(key, False):
del session[key]
return None
# request.session[key] = query_string
request.session[key] = request.GET.copy()
request.session.modified = True
else: # We are are coming from another page, restore filter if available
if session.get(key, False):
# urllib.urlencode
try:
query = request.session.get(key)
query.update(request.GET.copy())
query_string = urllib.urlencode(query)
redirect_to = path + '?' + query_string
request.session['redirected'] = True
return http.HttpResponseRedirect(redirect_to)
except:
pass
# query_string=request.session.get(key)
return None
class MiddlewareSite(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
request.site = get_current_site()
class MiddlewareAppData(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
from django.conf import settings
request.APP_DATA = settings.APP_DATA
# operate lang switch by url
class MiddlewareLang(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
from django.conf import settings
langs = dict(settings.LANGUAGES)
lang = settings.LANGUAGE_CODE.split('-')[0]
path = request.path.split('/')
if len(path) > 1 and path[1] in langs:
request.lang = path[1]
else:
request.lang = lang
class MiddlewareSwitchLocale(LocaleMiddleware):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
language = request.lang
translation.activate(language)
request.LANGUAGE_CODE = language
class MiddlewareLocale(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
sr = re.search('^/([A-Za-z]{2})/', request.path)
lang_dc = dict(item for item in settings.LANGUAGES)
if sr:
request.vt_lang = sr.group(1) if sr and sr.group(
1) in lang_dc else settings.LANGUAGE_CODE
else:
lng = request.META.get('HTTP_ACCEPT_LANGUAGE', '')
request.vt_lang = lng if lng and lng in lang_dc \
else settings.LANGUAGE_CODE
translation.activate(request.vt_lang)
# for django admin
# DEPRECATED!!!
class MiddlewareFilterPersist(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
if '/admin/' not in request.path:
return None
if not request.META.has_key('HTTP_REFERER'):
return None
popup = 'pop=1' in request.META['QUERY_STRING']
path = request.path
if path.find('/add/') >= 0:
return None
query_string = request.META['QUERY_STRING']
session = request.session
if session.get('redirected',
False): # so that we dont loop once redirected
del session['redirected']
return None
referrer = request.META['HTTP_REFERER'].split('?')[0]
referrer = referrer[referrer.find('/admin'):len(referrer)]
key = 'key' + path.replace('/', '_')
if popup:
key = 'popup' + path.replace('/', '_')
if path == referrer: # We are in same page as before
if query_string == '': # Filter is empty, delete it
if session.get(key, False):
del session[key]
return None
# request.session[key] = query_string
request.session[key] = request.GET.copy()
request.session.modified = True
else: # We are are coming from another page, restore filter if available
if session.get(key, False):
# urllib.urlencode
try:
query = request.session.get(key)
query.update(request.GET.copy())
query_string = urllib.urlencode(query)
redirect_to = path + '?' + query_string
request.session['redirected'] = True
return http.HttpResponseRedirect(redirect_to)
except BaseException as e:
pass
return None
class AbstractMiddlewareSimplePage(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request, cls, context=None):
try:
if not hasattr(request, 'simple_page'):
request.simple_page = None
url = re.sub(r'/+', '/', '/%s/' % request.path_info.strip('/'),
re.IGNORECASE)
sps = []
# check_re
# check if this seller subdomain
qset = cls.objects.filter(url=url, state=True)
for idx, sp in enumerate(qset):
sps.append(sp)
if len(sps):
request.sps = sps
return render(request, 'apps/frontend/simple_page_middleware.j2')
except cls.DoesNotExist:
request.simple_page = None
class MiddlewareRequest(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
# Im not happy on new django changes for future
request.REQUESTS = {}
request.REQUESTS.update(request.POST)
request.REQUESTS.update(request.GET)
for key, val in iteritems(request.REQUESTS):
if isinstance(val, list) and len(val) == 1:
request.REQUESTS[key] = val[0]
class MiddlewareCachedTree(MiddlewareMixin):
def __init__(self, get_response):
self.get_response = get_response
def process_request(self, request):
from django.apps import apps
cls = apps.get_model(settings.CATEGORY_MODEL)
request.cached_tree = CacheTree(cls.tree_get(), request.path)
# -*- coding: utf-8 -*-
import requests
import json
import sys
from operator import itemgetter
if sys.version_info[0] >= 3:
import io
def is_file(obj): return isinstance(obj, io.IOBase)
else:
def is_file(obj): return isinstance(obj, file)
def sort_ldict(ldict, skey):
"""
сортируем список словарей
:param ldict: - список со словарями
:param skey: - ключ словаря
:return:
"""
return sorted(ldict, key=itemgetter(skey))
class ProxWork():
nodes = False
base_url = '/api2/json'
csrf_prevention_token = False
pve_auth_cookie = False
session = False
session_time = False
last_request = False
def __init__(self, server_link, username, password, passtype='pve'):
"""
:param prox_link:
:param username:
:param password:
:param passtype: str - pve или pam
:return:
"""
self.server_link = server_link
self.username = username
self.password = password
self.passtype = passtype
# if not self.session:
self._create_session()
def _make_request(self, method='get', api_url='access/ticket', data=None, params=None, verify_ssl=False, **kwargs):
"""
общий метод для формирования запросов
обращается к серверу и запрашивает данные у api
:param method: get, post, put, delete
:param api_url:
:param data:
:param params:
:param verify_ssl:
:param kwargs:
:return:
"""
url = 'https://{0}{1}/{2}'.format(self.server_link, self.base_url, api_url)
r = self.session.request(method=method, url=url, verify=verify_ssl, data=data or None, params=params)
self.last_request = r
# self._print_request(r)
if r.ok:
return r.json()
else:
return False
def _print_request(self, request):
"""
тестит реквест для requests
"""
print request.reason
print request.content
print request.links
print request.raw
print request.request.headers
print request.request.method
print request.request.path_url
def _create_session(self, verify_ssl=False):
"""
авторизует пользователя проксмокса и создаёт сессию
:param verify_ssl:
"""
s = requests.session()
data = 'username={0}@{1}&password={2}'.format(self.username, self.passtype, self.password)
r = s.post('https://{0}{1}/access/ticket'.format(
self.server_link, self.base_url), data=data, verify=verify_ssl)
if r.ok:
s.cookies['PVEAuthCookie'] = r.json()['data']['ticket']
self.csrf_prevention_token = r.json()['data']['CSRFPreventionToken']
self.pve_auth_cookie = r.json()['data']['ticket']
self.session = s
else:
raise AuthenticationError("Couldn't authenticate user: {0} to {1}".format(
self.username, self.base_url + "/access/ticket"))
def get_nodes(self):
"""
отдаёт список всех нод (физических серверов) в кластере
"""
if not self.nodes:
nodes = self._make_request(api_url='nodes')['data']
self.nodes = nodes
return nodes
else:
return self.nodes
def get_vms_from_node(self, node):
"""
находит все машины у данной ноды
:param node: str - id ноды
"""
vms = self._make_request(api_url='nodes/{0}/qemu/'.format(node))
return vms['data']
def get_vm_config(self, node, vmid):
"""
читает конфиг машины из кластера ноды
:param node: str - id ноды
:param vmid: str - id машины
"""
vm = self._make_request(api_url='nodes/{0}/qemu/{1}/config'.format(node, vmid))
return vm['data']
def get_vms_all(self):
"""
здесь формируется список всех машин кластера для выдачи в темплейт
"""
vms = list()
for node in self.get_nodes():
activ_node = node['node']
for vm in self.get_vms_from_node(node['node']):
activ_vm = {
'node': node['node'],
'vmid': vm['vmid'],
'name': vm['name'],
'status': vm['status'],
'template': vm['template']
}
# так как информация из списка машин не полная надо запросить конфиг конкретной машины
vm_info = self.get_vm_config(node['node'], vm['vmid'])
activ_vm['net'] = vm_info['net0']
vms.append(activ_vm)
sorted_vms = sort_ldict(vms, 'vmid')
return sorted_vms
def get_tasks_bynode(self, node, limit=7):
"""
вывод задач с одной ноды готовых или нет
:param node: str - id ноды
:param limit: int - сколько задач выводить max 50
:return: list - список задач
"""
# tasks = proxmox.nodes(node).tasks.get(limit=limit)
params = {'limit':limit}
tasks = self._make_request(api_url='nodes/{0}/tasks'.format(node), params=params)
return tasks['data']
def get_task_log(self, node, upid):
"""
отдаёт статус одной задачи
:param node: str - id ноды
:param upid: str - id задачи
:return: dict - лог задачи
"""
task = self._make_request(api_url='nodes/{0}/tasks/{1}/status'.format(node, upid))
return task['data']
class AuthenticationError(Exception):
def __init__(self, msg):
super(AuthenticationError, self).__init__(msg)
self.msg = msg
def __str__(self):
return self.msg
def __repr__(self):
return self.__str__()
# todo: REF
class CustomSession(requests.Session):
def request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None,
timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None,
serializer=None):
#filter out streams
files = files or {}
data = data or {}
for k, v in data.copy().items():
if is_file(v):
files[k] = v
del data[k]
headers = None
if not files and serializer:
headers = {"content-type": 'application/x-www-form-urlencoded'}
return super(CustomSession, self).request(method, url, params, data, headers, cookies, files, auth,
timeout, allow_redirects, proxies, hooks, stream, verify, cert)
# -*- coding: utf-8 -*-
# SMSC.RU API (smsc.ru) версия 1.9
from datetime import datetime
from time import sleep
import smtplib
from django.conf import settings
try:
from urllib import urlopen, quote
except ImportError:
from urllib.request import urlopen
from urllib.parse import quote
# Константы для настройки библиотеки
SMSC_LOGIN = settings.API_SMS_CENTER_LOGIN # логин клиента
SMSC_PASSWORD = settings.API_SMS_CENTER_PASSWORD # пароль или MD5-хеш пароля в нижнем регистре
SMSC_POST = False # использовать метод POST
SMSC_HTTPS = False # использовать HTTPS протокол
SMSC_CHARSET = "utf-8" # кодировка сообщения (windows-1251 или koi8-r), по умолчанию используется utf-8
SMSC_DEBUG = False # флаг отладки
# Константы для отправки SMS по SMTP
SMTP_FROM = "api@smsc.ru" # e-mail адрес отправителя
SMTP_SERVER = "send.smsc.ru" # адрес smtp сервера
SMTP_LOGIN = "" # логин для smtp сервера
SMTP_PASSWORD = "" # пароль для smtp сервера
# Вспомогательная функция, эмуляция тернарной операции ?:
def ifs(cond, val1, val2):
if cond:
return val1
return val2
# Класс для взаимодействия с сервером smsc.ru
class SMSC(object):
# Метод отправки SMS
#
# обязательные параметры:
#
# phones - список телефонов через запятую или точку с запятой
# message - отправляемое сообщение
#
# необязательные параметры:
#
# translit - переводить или нет в транслит (1,2 или 0)
# time - необходимое время доставки в виде строки (DDMMYYhhmm, h1-h2, 0ts, +m)
# id - идентификатор сообщения. Представляет собой 32-битное число в диапазоне от 1 до 2147483647.
# format - формат сообщения (0 - обычное sms, 1 - flash-sms, 2 - wap-push, 3 - hlr, 4 - bin, 5 - bin-hex, 6 - ping-sms, 7 - mms, 8 - mail, 9 - call)
# sender - имя отправителя (Sender ID). Для отключения Sender ID по умолчанию необходимо в качестве имени
# передать пустую строку или точку.
# query - строка дополнительных параметров, добавляемая в URL-запрос ("valid=01:00&maxsms=3")
#
# возвращает массив (<id>, <количество sms>, <стоимость>, <баланс>) в случае успешной отправки
# либо массив (<id>, -<код ошибки>) в случае ошибки
def send_sms(self, phones, message, translit=0, time="", id=0, format=0,
sender=False, query=""):
formats = ["flash=1", "push=1", "hlr=1", "bin=1", "bin=2", "ping=1",
"mms=1", "mail=1", "call=1"]
m = self._smsc_send_cmd("send",
"cost=3&phones=" + quote(phones) + "&mes=" + quote(message) + \
"&translit=" + str(translit) + "&id=" + str(id) + ifs(format > 0,
"&" + formats[
format - 1],
"") + \
ifs(sender == False, "", "&sender=" + quote(str(sender))) + \
ifs(time, "&time=" + quote(time), "") + ifs(query, "&" + query, ""))
# (id, cnt, cost, balance) или (id, -error)
if SMSC_DEBUG:
if m[1] > "0":
print("Сообщение отправлено успешно. ID: " + m[
0] + ", всего SMS: " + m[1] + ", стоимость: " + m[
2] + ", баланс: " + m[3])
else:
print("Ошибка №" + m[1][1:] + ifs(m[0] > "0", ", ID: " + m[0],
""))
return m
# SMTP версия метода отправки SMS
def send_sms_mail(self, phones, message, translit=0, time="", id=0,
format=0, sender=""):
server = smtplib.SMTP(SMTP_SERVER)
if SMSC_DEBUG:
server.set_debuglevel(1)
if SMTP_LOGIN:
server.login(SMTP_LOGIN, SMTP_PASSWORD)
server.sendmail(SMTP_FROM, "send@send.smsc.ru",
"Content-Type: text/plain; charset=" + SMSC_CHARSET + "\n\n" + \
SMSC_LOGIN + ":" + SMSC_PASSWORD + ":" + str(
id) + ":" + time + ":" + str(translit) + "," + \
str(format) + "," + sender + ":" + phones + ":" + message)
server.quit()
# Метод получения стоимости SMS
#
# обязательные параметры:
#
# phones - список телефонов через запятую или точку с запятой
# message - отправляемое сообщение
#
# необязательные параметры:
#
# translit - переводить или нет в транслит (1,2 или 0)
# format - формат сообщения (0 - обычное sms, 1 - flash-sms, 2 - wap-push, 3 - hlr, 4 - bin, 5 - bin-hex, 6 - ping-sms, 7 - mms, 8 - mail, 9 - call)
# sender - имя отправителя (Sender ID)
# query - строка дополнительных параметров, добавляемая в URL-запрос ("list=79999999999:Ваш пароль: 123\n78888888888:Ваш пароль: 456")
#
# возвращает массив (<стоимость>, <количество sms>) либо массив (0, -<код ошибки>) в случае ошибки
def get_sms_cost(self, phones, message, translit=0, format=0, sender=False,
query=""):
formats = ["flash=1", "push=1", "hlr=1", "bin=1", "bin=2", "ping=1",
"mms=1", "mail=1", "call=1"]
m = self._smsc_send_cmd("send",
"cost=1&phones=" + quote(phones) + "&mes=" + quote(message) + \
ifs(sender == False, "", "&sender=" + quote(str(sender))) + \
"&translit=" + str(translit) + ifs(format > 0,
"&" + formats[format - 1],
"") + ifs(query, "&" + query, ""))
# (cost, cnt) или (0, -error)
if SMSC_DEBUG:
if m[1] > "0":
print("Стоимость рассылки: " + m[0] + ". Всего SMS: " + m[1])
else:
print("Ошибка №" + m[1][1:])
return m
# Метод проверки статуса отправленного SMS или HLR-запроса
#
# id - ID cообщения
# phone - номер телефона
#
# возвращает массив:
# для отправленного SMS (<статус>, <время изменения>, <код ошибки sms>)
# для HLR-запроса (<статус>, <время изменения>, <код ошибки sms>, <код IMSI SIM-карты>, <номер сервис-центра>, <код страны регистрации>,
# <код оператора абонента>, <название страны регистрации>, <название оператора абонента>, <название роуминговой страны>,
# <название роумингового оператора>)
#
# При all = 1 дополнительно возвращаются элементы в конце массива:
# (<время отправки>, <номер телефона>, <стоимость>, <sender id>, <название статуса>, <текст сообщения>)
#
# либо массив (0, -<код ошибки>) в случае ошибки
def get_status(self, id, phone, all=0):
m = self._smsc_send_cmd("status",
"phone=" + quote(phone) + "&id=" + str(id) + "&all=" + str(all))
# (status, time, err, ...) или (0, -error)
if SMSC_DEBUG:
if m[1] >= "0":
tm = ""
if m[1] > "0":
tm = str(datetime.fromtimestamp(int(m[1])))
print("Статус SMS = " + m[0] + ifs(m[1] > "0",
", время изменения статуса - " + tm,
""))
else:
print("Ошибка №" + m[1][1:])
if all and len(m) > 9 and (len(m) < 14 or m[14] != "HLR"):
m = (",".join(m)).split(",", 8)
return m
# Метод получения баланса
#
# без параметров
#
# возвращает баланс в виде строки или False в случае ошибки
def get_balance(self):
m = self._smsc_send_cmd("balance") # (balance) или (0, -error)
if SMSC_DEBUG:
if len(m) < 2:
print("Сумма на счете: " + m[0])
else:
print("Ошибка №" + m[1][1:])
return ifs(len(m) > 1, False, m[0])
# ВНУТРЕННИЕ МЕТОДЫ
# Метод вызова запроса. Формирует URL и делает 3 попытки чтения
def _smsc_send_cmd(self, cmd, arg=""):
url = ifs(SMSC_HTTPS, "https",
"http") + "://smsc.ru/sys/" + cmd + ".php"
_url = url
arg = "login=" + quote(SMSC_LOGIN) + "&psw=" + quote(
SMSC_PASSWORD) + "&fmt=1&charset=" + SMSC_CHARSET + "&" + arg
i = 0
ret = ""
while ret == "" and i <= 5:
if i > 0:
url = _url.replace("smsc.ru/", "www" + str(i) + ".smsc.ru/")
else:
i += 1
try:
if SMSC_POST or len(arg) > 2000:
data = urlopen(url, arg.encode(SMSC_CHARSET))
else:
data = urlopen(url + "?" + arg)
ret = str(data.read().decode(SMSC_CHARSET))
except:
ret = ""
i += 1
if ret == "":
if SMSC_DEBUG:
print("Ошибка чтения адреса: " + url)
ret = "," # фиктивный ответ
return ret.split(",")
# Examples:
# smsc = SMSC()
# smsc.send_sms("79999999999", "test", sender="sms")
# smsc.send_sms("79999999999", "http://smsc.ru\nSMSC.RU", query="maxsms=3")
# smsc.send_sms("79999999999", "0605040B8423F0DC0601AE02056A0045C60C036D79736974652E72750001036D7973697465000101", format=5)
# smsc.send_sms("79999999999", "", format=3)
# r = smsc.get_sms_cost("79999999999", "Вы успешно зарегистрированы!")
# smsc.send_sms_mail("79999999999", "test2", format=1)
# r = smsc.get_status(12345, "79999999999")
# print(smsc.get_balance())
import traceback
from collections import OrderedDict
import hashlib
import os
import urllib
# import urlparse
import sys
import re
import codecs
from datetime import datetime, timezone
from uuid import uuid4
from copy import deepcopy
from importlib import import_module
import ujson
import transliterate
from django.forms import model_to_dict
from django.utils.text import slugify
from django.contrib import auth
from django.core.files.base import ContentFile
from django.apps import apps
from django.http import HttpResponse
from django.template.loader import get_template
from django.utils.deconstruct import deconstructible
from django.utils.translation import get_language
from django.conf import settings
from django.core.files.storage import default_storage
from builtins import str
import logging
log = logging.getLogger('vest_utils')
def file_put_contents(filename, data, utf=False):
f = codecs.open(filename, "w", "utf-8-sig") if utf else open(filename, 'w')
f.write(data)
f.close()
def file_get_contents(url):
if os.path.exists(url):
return open(url, 'r').read()
opener = urllib.request.build_opener()
content = opener.open(url).read()
return content
def image_from_url_get(url):
url_data = urllib.parse.urlparse(url)
ext = os.path.splitext(url_data.path)[1]
return ContentFile(urllib.request.urlopen(url).read())
def image_from_url_get_2(url, name_gen=True):
file = image_from_url_get(url)
ext = os.path.splitext(urllib.parse.urlparse(url).path)[1]
if name_gen:
md5 = hashlib.md5()
# md5.update('%s-%s' % (url, datetime.now()))
md5.update('{}-{}'.format(url, datetime.now()).encode('utf-8'))
if not ext:
ext = '.jpg'
file.name = '%s%s' % (md5.hexdigest(), ext.lower())
else:
file.name = os.path.basename(url)
return file
def md5_for_file(f, block_size=2 ** 20):
f = open(f)
md5 = hashlib.md5()
while True:
data = f.read(block_size)
if not data:
break
md5.update(data)
f.close()
return md5
def model_class_get(path):
return apps.get_model(*path.split('.'))
def model_object_get(path, pk):
return model_class_get(path).objects.get(pk=pk)
def class_get(path):
arr = path.split('.')
return getattr(import_module('.'.join(arr[0:-1])), arr[-1])
def qset_to_dict(qset, keys='id', out=None):
res = OrderedDict()
if not isinstance(keys, list):
keys = [keys]
for item in qset:
for key in keys:
key_name = key(item) if hasattr(key, '__call__') else getattr(item,
key)
res[key_name] = out(key, item) if out else item
return res
def vt_model_to_dict(instance, deep_level=100, fields=None, exclude=None):
dc = {}
simple_fields = []
for fld in instance._meta.fields:
if not exclude is None and fld.name in exclude:
continue
if not fields is None and not fld.name in fields:
continue
if fld.is_relation:
value = getattr(instance, fld.name)
if value is None:
dc[fld.name] = None
elif deep_level - 1 >= 0:
dc[fld.name] = vt_model_to_dict(value, deep_level - 1)
else:
dc[fld.name] = str(dc[fld.name])
else:
simple_fields.append(fld.name)
if len(simple_fields):
dc.update(model_to_dict(instance, simple_fields))
return dc
def shop_login(request, username, password):
if auth.SESSION_KEY in request.session:
del request.session[auth.SESSION_KEY]
request.session.modified = True
user = auth.authenticate(username=username, password=password)
if user is not None and user.is_active:
auth.login(request, user)
return user
def exception_details():
exc_type, exc_obj, exc_tb = sys.exc_info()
file_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
return file_name, exc_type, exc_obj, exc_tb
def dict_sort(dic_or_list, key):
if isinstance(dic_or_list, list):
sorted_x = sorted(dic_or_list, key=lambda x: x[key])
else:
sorted_x = sorted(dic_or_list.items(), key=lambda x: x[1][key])
return sorted_x
def handle_uploaded_file(f, path):
destination = open(path, 'wb+')
for chunk in f.chunks():
destination.write(chunk)
destination.close()
def file_exists(path):
try:
with open(path) as f:
return True
except IOError as e:
return None
first_cap_re = re.compile('(.)([A-Z][a-z]+)')
all_cap_re = re.compile('([a-z0-9])([A-Z])')
def camel_to_underline(name):
s1 = first_cap_re.sub(r'\1_\2', name)
return all_cap_re.sub(r'\1_\2', s1).lower()
def site_map_generate(**kwargs):
urls = kwargs['urls'] # generator
path = kwargs['path'] # sitemap.xml path
domain = kwargs['domain']
sub_path = kwargs.get('sub_path', '')
max_links = kwargs.get('max_links', 50000)
parts = 1
def _xfi():
file_name = 'sitemap{}.xml'.format(parts)
file_path = os.path.join(path, file_name)
file = codecs.open(file_path, 'w+')
xml_start = '''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'''
file.write(xml_start)
return file
file = _xfi()
for i, url in enumerate(urls, 1):
file.write('<url>')
for tag in ['loc', 'changefreq', 'priority', 'lastmod']:
if tag in url:
file.write('<{0}>{1}</{0}>'.format(tag, url[tag]))
file.write('</url>')
if i % max_links == 0:
file.write('</urlset>')
file.close()
parts += 1
file = _xfi()
file.write('</urlset>')
file.close()
if parts == 1:
os.rename(os.path.join(path, 'sitemap1.xml'),
os.path.join(path, 'sitemap.xml'))
else:
# iso_date = datetime.now().isoformat()
iso_date = datetime.utcnow().replace(microsecond=0).replace(
tzinfo=timezone.utc).isoformat()
with codecs.open(os.path.join(path, 'sitemap.xml'), 'w+', 'utf-8') as f:
f.write(
'''<?xml version="1.0" encoding="UTF-8"?>
<sitemapindex xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">''')
for i in range(parts):
f.write('''<sitemap>
<loc>{}/{}sitemap{}.xml</loc>
<lastmod>{}</lastmod>
</sitemap>'''.format(domain, sub_path, i + 1, iso_date))
f.write('''</sitemapindex>''')
# TODO: DEPRECATED!!!
class SiteMapGenerator(object):
def __init__(self):
self.file = None
def _write_header(self):
self.file.write('''<?xml version="1.0" encoding="UTF-8"?>''')
def _open_urlset(self):
self.file.write(
'''<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">''')
def _close_urlset(self):
self.file.write('</urlset>')
def _write_urls(self, urls):
for url in urls:
self.file.write('<url>')
for tag in ['loc', 'changefreq', 'priority', 'lastmod']:
if tag in url:
self.file.write('<{0}>{1}</{0}>'.format(tag, url[tag]))
self.file.write('</url>')
def generate(self, path, **kwargs):
try:
self.file = open(path, 'w+')
self._write_header()
self._open_urlset()
self._write_urls(kwargs['urls'])
self._close_urlset()
except BaseException as e:
ex = traceback.format_exc()
log.error(ex)
return {'success': False, 'error': ex}
finally:
self.file.close()
return {'success': True}
def dict_merge(target, *args):
# Merge multiple dicts
if len(args) > 1:
for obj in args:
dict_merge(target, obj)
return target
# Recursively merge dicts and set non-dict values
obj = args[0]
if not isinstance(obj, dict):
return obj
for k, v in obj.items():
if k in target and isinstance(target[k], dict):
dict_merge(target[k], v)
else:
target[k] = deepcopy(v)
return target
def form_file_save(file, path):
name, ext = os.path.splitext(file.name)
name = '%s%s' % (str(uuid4()), ext)
file_path = '%s/%s' % (path, name)
full_path = default_storage.save(file_path, ContentFile(file.read()))
return file_path, full_path
@deconstructible
class UploadPath(object):
def __init__(self, *args, **kwargs):
self.base_path = kwargs['base_path']
self.field_name = kwargs.get('field_name')
self.name_gen = kwargs.get('name_gen')
def __call__(self, instance, file_name):
name = file_name
if self.name_gen:
name, ext = os.path.splitext(file_name)
name = '%s%s' % (str(uuid4()), ext)
if self.field_name:
field = getattr(instance, self.field_name)
path = os.path.join(self.base_path,
field.slug if hasattr(field, 'slug') else str(
field.pk))
else:
path = self.base_path
full_path = os.path.join(settings.MEDIA_ROOT, path)
if not os.path.exists(full_path):
os.makedirs(full_path)
path = os.path.join(path, name)
return path
# return path.format(instance.user.username, self.sub_path, filename)
def lang_get():
lng = get_language()
if not lng is None:
return get_language().split('-')[0]
return 'ru'
def slugify_ru(in_str, language_code=None):
try:
return slugify(
transliterate.translit(in_str, language_code=language_code,
reversed=True))
except Exception as e:
return slugify(
transliterate.translit(str(in_str), language_code=language_code,
reversed=True))
def ujson_response(x):
return HttpResponse(ujson.dumps(x),
content_type='application/json; charset=UTF-8')
def ex_find_template(name, exclude=[], dirs=None):
using = None
if len(exclude):
using = []
for loader_name in settings.TEMPLATE_LOADERS:
if loader_name in exclude:
continue
using.append(loader_name)
return get_template(name)
# language prefix
vt_lang_prefix = lambda url: r'^([A-Za-z]{2}/)?%s' % url
vt_kw_lang_prefix = lambda url: r'^(?P<lng>[A-Za-z]{2}/)?%s' % url
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment