Skip to content

Instantly share code, notes, and snippets.

@nazarred
Last active November 2, 2018 14:47
Show Gist options
  • Save nazarred/8c51a4b59d13591399e5d4bdf8a7d7c9 to your computer and use it in GitHub Desktop.
Save nazarred/8c51a4b59d13591399e5d4bdf8a7d7c9 to your computer and use it in GitHub Desktop.
examples
from __future__ import unicode_literals
from datetime import date, timedelta
from decimal import Decimal as D
from django.utils import timezone
from apps.applications.openexchangerates import get_or_update_rates_for_date
from apps.pricing.rounding_tools import nice_price_rounding_country
def D_FROM_FLOAT(x):
"""
converts float to Decimal, by converting it to string first and only then
to Decimal. Unfortunately in python 2.6+ behavior of Decimal has changed
and instead of raising TypeError when trying to convert float to Decimal
it now just converts it with all float "goodness". Example:
>>> Decimal(0.3) * 3
Decimal('0.8999999999999999666933092612')
>>> Decimal("0.3") * 3
Decimal('0.9')
"""
return D(str(x))
def calculate_ppp_rate(ppp, percentage):
# reverse the percentage
# so we can have slider that goes 1-100 but we need 100-x for calculations
percentage = D("100.00") - percentage
percentage = percentage / D("100.00")
if ppp > 1:
ppp_diff = D(str(ppp - 1))
else:
ppp_diff = D(str(1 - ppp))
ppp_diff = ppp_diff * percentage
if ppp > 1:
return ppp - ppp_diff
return ppp + ppp_diff
def calculate_price_matrix(
for_countries, start_price, start_country,
exchange_rates_date=date.today(),
enable_ppp=False, ppp_percentage=D(0),
ppp_limit='both', ppp_increase_percentage=None,
nice_prices=False, rounding_method=round
):
"""
Takes start_price (Decimal)
and start_currency (string of 3 letter iso code of the currency)
returns dictionary with all other available currencies.
Optionally use Purchasing Power Parity to get more "real" value.
Uses fancy algorithim to create nice, culturally aware prices.
(for example 1.99 instead of 2.00)
(1.95 for canada because they don't have 1,2¢ coins, etc.)
:rounding_method: - should be a callable, can be `round`, `math.floor` or
`math.ceil`
"""
if ppp_limit == 'down':
ppp_increase = False
# ppp_decrease = True
else: # 'both', 'limit_up'
ppp_increase = True
# ppp_decrease = True
if not exchange_rates_date:
exchange_rates_date = date.today()
exchange_rates = get_or_update_rates_for_date(
rates_date=exchange_rates_date
)
# convert input price to USD, so we need to know only exchante rates to USD
# (no need for exchange rates like GBP → PLN)
# + PPP is defined for USD.
default_country = for_countries[start_country]
exchange_rate_to_usd = exchange_rates.openexchangerates_dump[
default_country.currency_iso_code
]
# normalize default price
default_ppp = calculate_ppp_rate(default_country.ppp, ppp_percentage)
start_price = start_price / default_ppp
price_in_usd = start_price / D(exchange_rate_to_usd)
OUTPUT = {}
if ppp_increase_percentage:
ppp_increase_percentage = ppp_increase_percentage / D(100)
ppp_max = D(1) + ppp_increase_percentage
# iterate over all available currencies, and calculate output price for
# that currency.
for country_iso_code, country in for_countries.iteritems():
ppp = calculate_ppp_rate(country.ppp, ppp_percentage)
if not ppp_increase:
if ppp > 1:
ppp = 1
if ppp_limit == 'limit_up' and ppp_increase_percentage:
if ppp > ppp_max:
ppp = ppp_max
rounding = D(country.round_to)
# multiply the price by exchange rate to usd to get a value in
# USD (because PPP is defined for USD only.
exchange_rate_to_usd = exchange_rates.openexchangerates_dump[
country.currency_iso_code
]
price = price_in_usd * D(exchange_rate_to_usd)
# apply parity but only if enable_ppp is set to true, and
# currency is different than starting currency
# (don't apply parity to the current currency, aka leave current
# currency as is)
if enable_ppp:
price = price * ppp
price = price.quantize(D('0.01'))
if nice_prices:
result = nice_price_rounding_country(price, country.iso_code)
else:
result = str(price.quantize(rounding))
# invariant: OUTPUT["GB"] = {"price": "0.59"}
OUTPUT[country.iso_code] = {"price": result}
return OUTPUT
def create_sample_price_matrix(
original_matrix, enable_ppp=True, psychological_pricing=False,
mark_live=True
):
old_object_order = original_matrix.order
clone = original_matrix.clone()
clone.enable_ppp = enable_ppp
clone.psychological_pricing = psychological_pricing
clone.recalculate(clear_locked_prices=True)
if mark_live:
now = timezone.now()
clone.last_exported = now - timedelta(seconds=1)
clone.date_live = now
clone.save()
clone.to(old_object_order + 1)
return clone
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField, BooleanField
from wtforms.validators import DataRequired, Length, EqualTo, Email, ValidationError
from project.models import User
class RegisterForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Email(), Length(min=6, max=100)])
password = PasswordField('Password', validators=[DataRequired(), Length(min=6, max=40)])
confirm = PasswordField('Confirm Password', validators=[DataRequired(), EqualTo('password')])
submit = SubmitField('Register')
def validate_email(self, email):
user = User.query.filter_by(email=email.data).first()
if user is not None:
raise ValidationError('Please use a different email address.')
class LoginForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Email(), Length(min=6, max=100)])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Remember Me')
submit = SubmitField('Login')
# coding: utf-8
from marshmallow import (Schema, fields, validates, ValidationError,)
from .mail_chimp_handlers import (api_key_valid,)
class Contact(Schema):
company = fields.String(required=True)
address1 = fields.String(required=True)
city = fields.String(required=True)
state = fields.String(required=True)
zip = fields.String(required=True)
country = fields.String(required=True)
class CampaignDefaults(Schema):
from_name = fields.String(required=True)
from_email = fields.String(required=True)
subject = fields.String(required=True)
language = fields.String(required=True)
class Member(Schema):
STATUSES = ['subscribed', 'unsubscribed', 'cleaned']
email_address = fields.Email(required=True)
status = fields.String(required=True)
@validates('status')
def validates_status(self, data):
if data not in self.STATUSES:
raise ValidationError("'status' field should be 'subscribed', 'unsubscribed' or 'cleaned'")
class EmailList(Schema):
name = fields.String(required=True)
permission_reminder = fields.String(required=True)
contact = fields.Nested(Contact)
campaign_defaults = fields.Nested(CampaignDefaults)
members = fields.Nested(Member, many=True)
class APIKeySerializer(Schema):
api_key = fields.String(required=True)
@validates('api_key')
def validates_api_key(self, api_key):
if not api_key:
raise ValidationError("The field should not be an empty string")
valid, message = api_key_valid(api_key)
if not valid:
raise ValidationError(message)
from project import db, bcrypt
from datetime import datetime
class User(db.Model):
"""
Class that represents a user of the application
The following attributes of a user are stored in this table:
email address
password (hashed using Bcrypt)
authenticated flag (indicates if a user is logged in or not)
date that the user registered on
"""
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
email = db.Column(db.String, unique=True, nullable=False)
hashed_password = db.Column(db.Binary(60), nullable=False)
authenticated = db.Column(db.Boolean, default=False)
registered_on = db.Column(db.DateTime, nullable=True)
role = db.Column(db.String, default='user')
def __init__(self, email, plaintext_password, role='user'):
self.email = email
self.hashed_password = bcrypt.generate_password_hash(plaintext_password)
self.authenticated = False
self.registered_on = datetime.now()
self.role = role
def set_password(self, plaintext_password):
self.hashed_password = bcrypt.generate_password_hash(plaintext_password)
def is_correct_password(self, plaintext_password):
return bcrypt.check_password_hash(self.hashed_password, plaintext_password)
@property
def is_authenticated(self):
"""Return True if the user is authenticated."""
return self.authenticated
@property
def is_active(self):
"""Always True, as all users are active."""
return True
@property
def is_anonymous(self):
"""Always False, as anonymous users aren't supported."""
return False
def get_id(self):
"""Return the id of a user to satisfy Flask-Login's requirements."""
return str(self.id)
def __repr__(self):
return '<User {}>'.format(self.email)
from django.conf import settings
from django.core.urlresolvers import reverse
from django.core.validators import RegexValidator
from django.db import models
from django.db.models import Count
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ValidationError
from django.utils.functional import cached_property
import reversion
from reversion.models import Version
from autoslug import AutoSlugField
from djchoices import DjangoChoices, ChoiceItem
from ordered_model.models import OrderedModel
from apps.common.models import TimeStampedModel
from .forms import StatusForm
from apps.users.models import User
from apps.utils import mixins, expression_handler
from dirtyfields import DirtyFieldsMixin
# functions for populating slugs
def slug_period_populator(instance):
return "%s %s" % (instance.name, instance.designator)
def slug_title_populator(instance):
return instance.title
# Upper case char field, for colour hex codes
# See http://stackoverflow.com/questions/19498740/how-can-i-make-all-charfield-in-uppercase-direct-in-model
class UpperCaseCharField(models.CharField):
def __init__(self, *args, **kwargs):
super(UpperCaseCharField, self).__init__(*args, **kwargs)
def pre_save(self, model_instance, add):
value = getattr(model_instance, self.attname, None)
if value:
value = value.upper()
setattr(model_instance, self.attname, value)
return value
else:
return super(UpperCaseCharField, self).pre_save(model_instance, add)
class PeriodSystems(DjangoChoices):
Calendar = ChoiceItem('MO')
Australian_FY = ChoiceItem('AU')
@staticmethod
def name_designators(designator):
if designator == PeriodSystems.Calendar:
return settings.MONTH_NAMES
else:
return settings.QUARTER_NAMES
"""
On saving a project, signal listeners in signals.py with create required statuses and colours
"""
class ProjectManager(models.Manager):
def get_default_project(self):
return self.order_by('order').first()
class Project(TimeStampedModel, OrderedModel):
objects = ProjectManager()
title = models.CharField(max_length=30, blank=False)
slug = AutoSlugField(populate_from=slug_title_populator, unique=True)
quarter_system = models.CharField(max_length=2, choices=PeriodSystems.choices,
validators=[PeriodSystems.validator],
default=PeriodSystems.Australian_FY)
updated_by = models.ForeignKey(User)
logo = models.ImageField(upload_to='logo/', blank=True, null=True)
def __str__(self):
return self.title
def get_logo(self):
if self.logo:
return self.logo.url
else:
return "/static/images/default_logo.png"
class Meta:
ordering = ('order',)
class Colour(TimeStampedModel, OrderedModel):
value = UpperCaseCharField(max_length=6, validators=[RegexValidator(regex="[0-9ABCDEF]{6}")])
class StatusColour(Colour):
pass
class WorkStreamColourManager(models.Manager):
def get_next_colour(self):
return self.annotate(times_used=Count('workstream')).order_by('times_used', 'order')[0]
class WorkStreamColour(Colour):
objects = WorkStreamColourManager()
class Status(TimeStampedModel, OrderedModel):
title = models.CharField(max_length=30)
slug = AutoSlugField(populate_from=slug_title_populator, unique_with='project')
colour = models.ForeignKey(StatusColour)
project = models.ForeignKey(Project)
order_with_respect_to = 'project'
def __str__(self):
return self.title
class Meta:
ordering = ('order',)
class StrategicPriority(TimeStampedModel, OrderedModel):
title = models.CharField(max_length=30)
slug = AutoSlugField(populate_from=slug_title_populator, unique_with='project')
project = models.ForeignKey(Project)
order_with_respect_to = 'project'
def __str__(self):
return self.title
class Meta:
ordering = ('order',)
class WorkStreamManager(models.Manager):
def get_default_workstream(self, project_slug):
return self.filter(project__slug=project_slug).first()
@reversion.register(fields=['status'])
class WorkStream(mixins.RevisionHelperMixin, DirtyFieldsMixin, TimeStampedModel, OrderedModel):
ON_TRACK = 'on-track'
MONITOR = 'monitor'
STOPINTERVENE = 'stopintervene'
ON_TRACK_THRESHOLD = 60
MONITOR_THRESHOLD = 30
STOPINTERVENE_THRESHOLD = 0
STATUSES = (
(ON_TRACK, 'On-Track'),
(MONITOR, 'Monitor'),
(STOPINTERVENE, 'Stop/Intervene')
)
objects = WorkStreamManager()
project = models.ForeignKey(Project)
title = models.CharField(max_length=30)
slug = AutoSlugField(populate_from=slug_title_populator, unique=True)
colour = models.ForeignKey(WorkStreamColour)
status = models.DecimalField(default=0, max_digits=5, decimal_places=2)
order_with_respect_to = 'project'
class Meta:
ordering = ('order',)
def __str__(self):
return self.title
def save(self, *args, **kwargs):
if not hasattr(self, 'colour'):
self.colour = WorkStreamColour.objects.get_next_colour()
dirty_fields = self.get_dirty_fields()
if 'status' in dirty_fields.keys():
with reversion.create_revision():
super().save(*args, **kwargs)
else:
super().save(*args, **kwargs)
def display_status(self):
if self.status >= self.ON_TRACK_THRESHOLD:
return self.ON_TRACK
elif self.MONITOR_THRESHOLD <= self.status < self.ON_TRACK_THRESHOLD:
return self.MONITOR
else:
return self.STOPINTERVENE
@property
def previous_status(self):
previous_version = self.get_previous_version()
return previous_version.field_dict['status'] if previous_version else None
def set_status(self):
status_percent = self.get_status_percent()
self.status = status_percent
self.save()
def get_status_percent(self):
activities_amount = self.get_activities().count()
if activities_amount:
on_track_activities_amount = self.get_activities_by_status('on-track').count()
on_track_activities_percent = on_track_activities_amount / activities_amount * 100
return on_track_activities_percent
return 0
def get_activities(self):
return self.activity_set.all()
def get_activities_by_status(self, status):
activities = self.get_activities()
activities_by_status = activities.filter(current_status=status)
return activities_by_status
def get_activities_amount_by_status(self):
statuses_list = list()
for value, display_name in Activity.STATUSES:
status_dict = dict()
status_dict['amount'] = self.get_activities_by_status(value).count()
status_dict['display_name'] = display_name
status_dict['value'] = value
statuses_list.append(status_dict)
return statuses_list
def get_activities_amount(self):
return self.get_activities().count()
ACTIVITY_METRICS_TRACKED = ['status', 'current_status']
class ActivityManager(models.Manager):
def __init__(self, *args, **kwargs):
self.archive = kwargs.pop('archive', False)
super(ActivityManager, self).__init__(*args, **kwargs)
def get_queryset(self):
if self.archive:
return super(ActivityManager, self).get_queryset()
return super(ActivityManager, self).get_queryset().filter(archive=False)
@reversion.register(fields=ACTIVITY_METRICS_TRACKED)
class Activity(mixins.RevisionHelperMixin, DirtyFieldsMixin, TimeStampedModel):
FIELDS_TO_CHECK = ['current_status']
ON_TRACK = 'on-track'
MONITOR = 'monitor'
STOPINTERVENE = 'stopintervene'
PLANNED_ACTIVITY = 'planned_activity'
STATUSES = (
(ON_TRACK, 'On-Track'),
(MONITOR, 'Monitor'),
(STOPINTERVENE, 'Stop/Intervene'),
(PLANNED_ACTIVITY, 'Planned Activity'),
)
title = models.CharField(max_length=30)
slug = AutoSlugField(populate_from=slug_title_populator, unique=True)
description = models.TextField()
work_stream = models.ForeignKey(WorkStream)
strategic_priorities = models.ManyToManyField(StrategicPriority, blank=True)
status = models.ForeignKey(Status)
completed = models.BooleanField(default=False)
contacts = models.ManyToManyField(User)
sponsor = models.ForeignKey(User, related_name='activities', blank=True, null=True)
archive = models.BooleanField(default=False)
current_status = models.CharField(max_length=30, default=ON_TRACK, choices=STATUSES)
start_at = models.DateField()
end_at = models.DateField()
objects = ActivityManager()
all_objects = ActivityManager(archive=True)
def __str__(self):
return self.title
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
dirty_fields = self.get_dirty_fields()
if 'current_status' in dirty_fields.keys():
with reversion.create_revision():
super().save(force_insert, force_update, using, update_fields)
self.work_stream.set_status()
else:
super().save(force_insert, force_update, using, update_fields)
def get_duxeboard_url(self):
base_url = reverse('dashboard:project-view', kwargs={'project': self.work_stream.project.slug})
react_router_url = '#/view/_all'
query_string = 'act[slug]={activity_slug}&act[workstream_slug]={activity_work_stream_slug}'.format(
activity_slug=self.slug,
activity_work_stream_slug=self.work_stream.slug
)
return '{base_url}{react_router_url}?{query_string}'.format(base_url=base_url,
react_router_url=react_router_url,
query_string=query_string)
def get_edit_url(self):
return reverse('cms:viewall-activity-detail',
kwargs={'project': self.work_stream.project.slug, 'activity': self.slug})
def get_status_form(self):
form = StatusForm(qs=Status.objects.filter(project=self.work_stream.project), data={
'status': self.status.pk
})
return form
def get_revision_history(self, start=None, end=None):
# Mapping of revision id to metrics and their values
history = {}
for value in self.values.all():
metric_version = value.get_versions(start, end)
for version in metric_version:
revision = version.revision
revision_id = revision.id
if revision_id not in history:
history[revision_id] = {
'date': revision.date_created.strftime("%Y-%m-%d %H:%M:%S %Z"),
'username': revision.user.username if revision.user is not None else '[System]'
}
history[revision_id][value.metric.label] = version.field_dict['value']
activity_version = self.get_versions(start, end)
for status_version in activity_version:
revision = status_version.revision
revision_id = revision.id
status = Status.objects.get(id=status_version.field_dict['status_id'])
if revision_id not in history:
history[revision_id] = {
'date': revision.date_created.strftime("%Y-%m-%d %H:%M:%S %Z"),
'username': revision.user.username if revision.user is not None else '[System]'
}
history[revision_id]['status'] = status.title
return history
def get_status_colour(self):
return settings.ACTIVITY_DEFAULT_STATUSES[self.current_status]['COLOUR']
class Unit(TimeStampedModel):
name = models.CharField(max_length=15)
prefix = models.CharField(max_length=15, null=True, blank=True)
suffix = models.CharField(max_length=15, null=True, blank=True)
def __str__(self):
return self.name
class Metric(TimeStampedModel):
unit = models.ForeignKey(Unit)
name = models.CharField(max_length=255)
formula = models.CharField(max_length=255, blank=True)
min_value = models.DecimalField(null=True, max_digits=19, decimal_places=10)
max_value = models.DecimalField(null=True, max_digits=19, decimal_places=10)
decimal_places = models.SmallIntegerField(null=True)
rollup_metric = models.BooleanField(default=True)
def __str__(self):
return self.name
@classmethod
def get_metrics_names(cls):
return [name.lower() for name in cls.objects.all().order_by('-name').values_list('name', flat=True)]
@classmethod
def get_metrics_names_and_ids(cls):
return {name.lower(): id for name, id in cls.objects.all().values_list('name', 'id')}
@reversion.register(fields=['value'])
class Value(mixins.RevisionHelperMixin, TimeStampedModel):
metric = models.ForeignKey(Metric, related_name='values')
activity = models.ForeignKey(Activity, related_name='values')
value = models.DecimalField(max_digits=19, decimal_places=10)
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
with reversion.create_revision():
super().save(force_insert, force_update, using, update_fields)
def clean(self):
super().clean()
if self.metric.max_value is not None and self.metric.max_value < self.value:
raise ValidationError(_('"value" field should be smaller than "max_value" field of Metric model'))
if self.metric.min_value is not None and self.metric.min_value > self.value:
raise ValidationError(_('"value" field should be greater than "min_value" field of Metric model'))
@property
def previous_value(self):
previous_version = self.get_previous_version()
return previous_version.field_dict['value'] if previous_version else None
class ResourceCategory(TimeStampedModel):
name = models.CharField(max_length=40, unique=True)
class Meta:
ordering = ('name',)
def __str__(self):
return self.name
class Resource(TimeStampedModel):
file_name = models.CharField(max_length=40)
resource_category = models.ForeignKey(ResourceCategory, related_name='resources')
file = models.FileField(upload_to='resources/')
activities = models.ManyToManyField(Activity, related_name='resources')
def __str__(self):
return self.file_name
class BaseFormulaModel(models.Model):
def __str__(self):
return '{name}={value}'.format(name=self.name, value=self.value)
class Meta:
abstract = True
class Variable(BaseFormulaModel):
name = models.CharField(max_length=30, unique=True) # TODO/FIXME
value = models.DecimalField(max_digits=19, decimal_places=10)
class VariableExpression(BaseFormulaModel):
expression = models.OneToOneField('core.Expression')
@property
def name(self):
return self.expression.name
@property
def value(self):
return self.expression.value
class Expression(BaseFormulaModel):
name = models.CharField(max_length=30, unique=True)
expression_string = models.CharField(max_length=255)
variables = models.ManyToManyField(Variable, related_name='variables', blank=True)
expressions = models.ManyToManyField(VariableExpression, related_name='variable_expressions', blank=True)
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
created = True if not self.id else False
super().save(force_insert, force_update, using, update_fields)
if created:
VariableExpression.objects.create(expression=self)
@cached_property
def value(self):
variables = self.variables.all()
expressions = self.expressions.all()
e_h = expression_handler.ExpressionHandler(self.expression_string, variables, expressions)
value = e_h.calculate()
return value
import json
from json import JSONDecodeError
from django.db.models import Prefetch
from django.core.exceptions import ObjectDoesNotExist
from django.core.mail import send_mail
from django.http import Http404
from django.http import JsonResponse, HttpResponseBadRequest
from django.urls import reverse
from django.views import View
from django.contrib.auth.mixins import LoginRequiredMixin
from django.views.generic import RedirectView
from django.conf import settings
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import viewsets
from rest_framework import mixins
from rest_framework import permissions
from rest_framework import filters as rest_framework_filters
import django_filters
from apps.core.serializers import (
ProjectSerializer, ProjectLogoSerializer, WorkStreamSerializer, ActivitySerializer,
StatusSerializer, ColourSerializer, StrategicPrioritySerializer, TimelineActivitySerializer,
ResourceCategorySerializer, ResourceSerializer, ActivityResourcesCategorySerializer, UnitSerializer,
MetricSerializer, ValueSerializer, ValueListRetrieveSerializer, VariableSerializer, ExpressionSerializer
)
from apps.core.models import (
Project, WorkStream, Activity,
Status, Colour, WorkStreamColour,
StatusColour, StrategicPriority, ResourceCategory,
Resource, Unit, Metric, Value, Variable, Expression
)
from apps.users.models import User
from apps.users.serializers import UserSerializer
from .viewset_mixins import ScopeMixin
from . import filters
class DuxeVersionView(APIView):
permission_classes = (permissions.IsAuthenticated,)
def get(self, request):
version = settings.DUXE_VERSION
return Response({'version': version})
class StatusViewSet(ScopeMixin, viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows statuses to be viewed or edited
"""
queryset = Status.objects.all()
queryset = StatusSerializer.setup_eager_loading(queryset)
serializer_class = StatusSerializer
lookup_field = 'slug'
scope_filters = {
'project__slug': 'project_slug'
}
class ColourViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows colours to be viewed or edited
"""
queryset = Colour.objects.all()
serializer_class = ColourSerializer
class WorkStreamColourViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows colours to be viewed or edited
"""
queryset = WorkStreamColour.objects.all()
serializer_class = ColourSerializer
class StatusColourViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows colours to be viewed or edited
"""
queryset = StatusColour.objects.all()
serializer_class = ColourSerializer
class ProjectViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows projects to be viewed or edited
"""
queryset = Project.objects.all().order_by('title')
serializer_class = ProjectSerializer
lookup_field = 'slug'
class ProjectLogoView(mixins.UpdateModelMixin, viewsets.GenericViewSet):
"""
API endpoint that allows projects logo to be edited
"""
queryset = Project.objects.all().order_by('title')
serializer_class = ProjectLogoSerializer
lookup_field = 'slug'
class WorkStreamViewSet(ScopeMixin, viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows workstreams to be viewed or edited
"""
queryset = WorkStream.objects.filter().all()
queryset = WorkStreamSerializer.setup_eager_loading(queryset)
serializer_class = WorkStreamSerializer
filter_class = filters.WorkStreamFilter
lookup_field = 'slug'
scope_filters = {
"project__slug": "project_slug"
}
class ActivityFilter(django_filters.rest_framework.FilterSet):
start_period = django_filters.NumberFilter(name="start_at", lookup_expr='gte', distinct=True)
end_period = django_filters.NumberFilter(name="end_at", lookup_expr='lte', distinct=True)
class Meta:
model = Activity
fields = ['start_period', 'end_period']
class ActivityViewSet(ScopeMixin, viewsets.ReadOnlyModelViewSet):
"""
API endpoint that allows activities to be viewed and edited
"""
queryset = Activity.objects.all()
queryset = ActivitySerializer.setup_eager_loading(queryset)
serializer_class = ActivitySerializer
filter_class = filters.ActivityFilter
lookup_field = 'slug'
class ViewAllActivityViewSet(ActivityViewSet):
scope_filters = {
'work_stream__project__slug': 'project_slug'
}
class WorkStreamActivityViewSet(ActivityViewSet):
scope_filters = {
'work_stream__project__slug': 'project_slug',
'work_stream__slug': 'workstream_slug',
}
class StrategicPriorityViewSet(ScopeMixin, viewsets.ModelViewSet):
"""
API endpoint for viewing strategic priorities
"""
queryset = StrategicPriority.objects.all()
queryset = StrategicPrioritySerializer.setup_eager_loading(queryset)
serializer_class = StrategicPrioritySerializer
lookup_field = 'slug'
scope_filters = {
"project__slug": "project_slug"
}
# Users
class UserViewSet(viewsets.ModelViewSet):
"""
API endpoint for viewing user details
"""
queryset = User.objects.all()
queryset = UserSerializer.setup_eager_loading(queryset)
serializer_class = UserSerializer
class CurrentUserRedirect(LoginRequiredMixin, RedirectView):
permanent = False
def get_redirect_url(self, *args, **kwargs):
return reverse('api:user-detail', kwargs={
'pk': self.request.user.id
})
# Feedback
class ActivityFeedbackView(View, LoginRequiredMixin):
def post(self, request, *args, **kwargs):
project_slug = kwargs['project_slug']
activity_slug = kwargs['activity_slug']
try:
post_data = request.POST
contacts = json.loads(post_data['contacts'])
feedback_type = post_data['feedbackType']
message = post_data['message']
except (KeyError, JSONDecodeError):
return HttpResponseBadRequest()
try:
activity = Activity.objects.get(slug=activity_slug, work_stream__project__slug=project_slug)
recipients = []
for username in contacts:
user = User.objects.get(username=username)
recipients.append(user.email)
if len(recipients) is 0:
raise ValueError('Need at least 1 recipient')
except ObjectDoesNotExist:
raise Http404()
except ValueError:
return HttpResponseBadRequest()
subject = '[Duxe] [{type}] on Activity "{title}"'.format(type=feedback_type, title=activity.title)
message = 'The following message was entered on the feedback form for the activity "{activity}":' \
'\n---\n{message}\n---\n'.format(activity=activity.title, message=message)
message += 'View this activity via this link: {scheme}://{base_url}{url}'.format(scheme=request.scheme,
base_url=request.get_host(),
url=activity.get_duxeboard_url())
from_email = request.user.email
success = send_mail(subject, message, from_email, recipients, fail_silently=False)
return JsonResponse({"success": success})
class TimelineActivityViewSet(viewsets.ReadOnlyModelViewSet):
queryset = Activity.objects.all()\
.select_related('sponsor', 'work_stream', 'work_stream__project')\
.prefetch_related('contacts', 'values', 'values__metric', 'values__metric__unit')\
serializer_class = TimelineActivitySerializer
filter_class = filters.ActivityFilter
lookup_field = 'slug'
def get_queryset(self):
project_slug = self.kwargs.get('project_slug')
if project_slug:
return self.queryset.filter(work_stream__project__slug=project_slug)
return self.queryset
class ResourceCategoryViewSet(viewsets.ModelViewSet):
queryset = ResourceCategory.objects.all().prefetch_related('resources')
serializer_class = ResourceCategorySerializer
class ResourceViewSet(viewsets.ModelViewSet):
queryset = Resource.objects.all().prefetch_related('activities')
serializer_class = ResourceSerializer
class ActivityResourceViewSet(mixins.ListModelMixin, viewsets.GenericViewSet):
serializer_class = ActivityResourcesCategorySerializer
def get_queryset(self):
activity_slug = self.kwargs.get('activity_slug')
return ResourceCategory.objects.all().prefetch_related(
Prefetch(
'resources',
queryset=Resource.objects.filter(activities__slug=activity_slug)
)
)
class UnitViewSet(viewsets.ModelViewSet):
queryset = Unit.objects.all()
serializer_class = UnitSerializer
class MetricViewSet(viewsets.ModelViewSet):
queryset = Metric.objects.all()
serializer_class = MetricSerializer
class ValueViewSet(viewsets.ModelViewSet):
queryset = Value.objects.all().select_related('metric', 'metric__unit')
def update(self, request, *args, **kwargs):
super().update(request, *args, **kwargs)
instance = self.get_object()
serializer = ValueListRetrieveSerializer(instance)
return Response(serializer.data)
def get_serializer_class(self):
if self.action == 'list' or self.action == 'retrieve':
return ValueListRetrieveSerializer
return ValueSerializer
class VariableViewSet(viewsets.ModelViewSet):
queryset = Variable.objects.all()
serializer_class = VariableSerializer
class ExpressionViewSet(viewsets.ModelViewSet):
queryset = Expression.objects.all().prefetch_related('variables', 'expressions', 'expressions__expression')
serializer_class = ExpressionSerializer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment