Last active
November 2, 2018 14:47
-
-
Save nazarred/8c51a4b59d13591399e5d4bdf8a7d7c9 to your computer and use it in GitHub Desktop.
examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
from datetime import date, timedelta | |
from decimal import Decimal as D | |
from django.utils import timezone | |
from apps.applications.openexchangerates import get_or_update_rates_for_date | |
from apps.pricing.rounding_tools import nice_price_rounding_country | |
def D_FROM_FLOAT(x): | |
""" | |
converts float to Decimal, by converting it to string first and only then | |
to Decimal. Unfortunately in python 2.6+ behavior of Decimal has changed | |
and instead of raising TypeError when trying to convert float to Decimal | |
it now just converts it with all float "goodness". Example: | |
>>> Decimal(0.3) * 3 | |
Decimal('0.8999999999999999666933092612') | |
>>> Decimal("0.3") * 3 | |
Decimal('0.9') | |
""" | |
return D(str(x)) | |
def calculate_ppp_rate(ppp, percentage): | |
# reverse the percentage | |
# so we can have slider that goes 1-100 but we need 100-x for calculations | |
percentage = D("100.00") - percentage | |
percentage = percentage / D("100.00") | |
if ppp > 1: | |
ppp_diff = D(str(ppp - 1)) | |
else: | |
ppp_diff = D(str(1 - ppp)) | |
ppp_diff = ppp_diff * percentage | |
if ppp > 1: | |
return ppp - ppp_diff | |
return ppp + ppp_diff | |
def calculate_price_matrix( | |
for_countries, start_price, start_country, | |
exchange_rates_date=date.today(), | |
enable_ppp=False, ppp_percentage=D(0), | |
ppp_limit='both', ppp_increase_percentage=None, | |
nice_prices=False, rounding_method=round | |
): | |
""" | |
Takes start_price (Decimal) | |
and start_currency (string of 3 letter iso code of the currency) | |
returns dictionary with all other available currencies. | |
Optionally use Purchasing Power Parity to get more "real" value. | |
Uses fancy algorithim to create nice, culturally aware prices. | |
(for example 1.99 instead of 2.00) | |
(1.95 for canada because they don't have 1,2¢ coins, etc.) | |
:rounding_method: - should be a callable, can be `round`, `math.floor` or | |
`math.ceil` | |
""" | |
if ppp_limit == 'down': | |
ppp_increase = False | |
# ppp_decrease = True | |
else: # 'both', 'limit_up' | |
ppp_increase = True | |
# ppp_decrease = True | |
if not exchange_rates_date: | |
exchange_rates_date = date.today() | |
exchange_rates = get_or_update_rates_for_date( | |
rates_date=exchange_rates_date | |
) | |
# convert input price to USD, so we need to know only exchante rates to USD | |
# (no need for exchange rates like GBP → PLN) | |
# + PPP is defined for USD. | |
default_country = for_countries[start_country] | |
exchange_rate_to_usd = exchange_rates.openexchangerates_dump[ | |
default_country.currency_iso_code | |
] | |
# normalize default price | |
default_ppp = calculate_ppp_rate(default_country.ppp, ppp_percentage) | |
start_price = start_price / default_ppp | |
price_in_usd = start_price / D(exchange_rate_to_usd) | |
OUTPUT = {} | |
if ppp_increase_percentage: | |
ppp_increase_percentage = ppp_increase_percentage / D(100) | |
ppp_max = D(1) + ppp_increase_percentage | |
# iterate over all available currencies, and calculate output price for | |
# that currency. | |
for country_iso_code, country in for_countries.iteritems(): | |
ppp = calculate_ppp_rate(country.ppp, ppp_percentage) | |
if not ppp_increase: | |
if ppp > 1: | |
ppp = 1 | |
if ppp_limit == 'limit_up' and ppp_increase_percentage: | |
if ppp > ppp_max: | |
ppp = ppp_max | |
rounding = D(country.round_to) | |
# multiply the price by exchange rate to usd to get a value in | |
# USD (because PPP is defined for USD only. | |
exchange_rate_to_usd = exchange_rates.openexchangerates_dump[ | |
country.currency_iso_code | |
] | |
price = price_in_usd * D(exchange_rate_to_usd) | |
# apply parity but only if enable_ppp is set to true, and | |
# currency is different than starting currency | |
# (don't apply parity to the current currency, aka leave current | |
# currency as is) | |
if enable_ppp: | |
price = price * ppp | |
price = price.quantize(D('0.01')) | |
if nice_prices: | |
result = nice_price_rounding_country(price, country.iso_code) | |
else: | |
result = str(price.quantize(rounding)) | |
# invariant: OUTPUT["GB"] = {"price": "0.59"} | |
OUTPUT[country.iso_code] = {"price": result} | |
return OUTPUT | |
def create_sample_price_matrix( | |
original_matrix, enable_ppp=True, psychological_pricing=False, | |
mark_live=True | |
): | |
old_object_order = original_matrix.order | |
clone = original_matrix.clone() | |
clone.enable_ppp = enable_ppp | |
clone.psychological_pricing = psychological_pricing | |
clone.recalculate(clear_locked_prices=True) | |
if mark_live: | |
now = timezone.now() | |
clone.last_exported = now - timedelta(seconds=1) | |
clone.date_live = now | |
clone.save() | |
clone.to(old_object_order + 1) | |
return clone | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask_wtf import FlaskForm | |
from wtforms import StringField, PasswordField, SubmitField, BooleanField | |
from wtforms.validators import DataRequired, Length, EqualTo, Email, ValidationError | |
from project.models import User | |
class RegisterForm(FlaskForm): | |
email = StringField('Email', validators=[DataRequired(), Email(), Length(min=6, max=100)]) | |
password = PasswordField('Password', validators=[DataRequired(), Length(min=6, max=40)]) | |
confirm = PasswordField('Confirm Password', validators=[DataRequired(), EqualTo('password')]) | |
submit = SubmitField('Register') | |
def validate_email(self, email): | |
user = User.query.filter_by(email=email.data).first() | |
if user is not None: | |
raise ValidationError('Please use a different email address.') | |
class LoginForm(FlaskForm): | |
email = StringField('Email', validators=[DataRequired(), Email(), Length(min=6, max=100)]) | |
password = PasswordField('Password', validators=[DataRequired()]) | |
remember_me = BooleanField('Remember Me') | |
submit = SubmitField('Login') | |
# coding: utf-8 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from marshmallow import (Schema, fields, validates, ValidationError,) | |
from .mail_chimp_handlers import (api_key_valid,) | |
class Contact(Schema): | |
company = fields.String(required=True) | |
address1 = fields.String(required=True) | |
city = fields.String(required=True) | |
state = fields.String(required=True) | |
zip = fields.String(required=True) | |
country = fields.String(required=True) | |
class CampaignDefaults(Schema): | |
from_name = fields.String(required=True) | |
from_email = fields.String(required=True) | |
subject = fields.String(required=True) | |
language = fields.String(required=True) | |
class Member(Schema): | |
STATUSES = ['subscribed', 'unsubscribed', 'cleaned'] | |
email_address = fields.Email(required=True) | |
status = fields.String(required=True) | |
@validates('status') | |
def validates_status(self, data): | |
if data not in self.STATUSES: | |
raise ValidationError("'status' field should be 'subscribed', 'unsubscribed' or 'cleaned'") | |
class EmailList(Schema): | |
name = fields.String(required=True) | |
permission_reminder = fields.String(required=True) | |
contact = fields.Nested(Contact) | |
campaign_defaults = fields.Nested(CampaignDefaults) | |
members = fields.Nested(Member, many=True) | |
class APIKeySerializer(Schema): | |
api_key = fields.String(required=True) | |
@validates('api_key') | |
def validates_api_key(self, api_key): | |
if not api_key: | |
raise ValidationError("The field should not be an empty string") | |
valid, message = api_key_valid(api_key) | |
if not valid: | |
raise ValidationError(message) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from project import db, bcrypt | |
from datetime import datetime | |
class User(db.Model): | |
""" | |
Class that represents a user of the application | |
The following attributes of a user are stored in this table: | |
email address | |
password (hashed using Bcrypt) | |
authenticated flag (indicates if a user is logged in or not) | |
date that the user registered on | |
""" | |
__tablename__ = 'users' | |
id = db.Column(db.Integer, primary_key=True, autoincrement=True) | |
email = db.Column(db.String, unique=True, nullable=False) | |
hashed_password = db.Column(db.Binary(60), nullable=False) | |
authenticated = db.Column(db.Boolean, default=False) | |
registered_on = db.Column(db.DateTime, nullable=True) | |
role = db.Column(db.String, default='user') | |
def __init__(self, email, plaintext_password, role='user'): | |
self.email = email | |
self.hashed_password = bcrypt.generate_password_hash(plaintext_password) | |
self.authenticated = False | |
self.registered_on = datetime.now() | |
self.role = role | |
def set_password(self, plaintext_password): | |
self.hashed_password = bcrypt.generate_password_hash(plaintext_password) | |
def is_correct_password(self, plaintext_password): | |
return bcrypt.check_password_hash(self.hashed_password, plaintext_password) | |
@property | |
def is_authenticated(self): | |
"""Return True if the user is authenticated.""" | |
return self.authenticated | |
@property | |
def is_active(self): | |
"""Always True, as all users are active.""" | |
return True | |
@property | |
def is_anonymous(self): | |
"""Always False, as anonymous users aren't supported.""" | |
return False | |
def get_id(self): | |
"""Return the id of a user to satisfy Flask-Login's requirements.""" | |
return str(self.id) | |
def __repr__(self): | |
return '<User {}>'.format(self.email) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.conf import settings | |
from django.core.urlresolvers import reverse | |
from django.core.validators import RegexValidator | |
from django.db import models | |
from django.db.models import Count | |
from django.utils.translation import gettext_lazy as _ | |
from django.core.exceptions import ValidationError | |
from django.utils.functional import cached_property | |
import reversion | |
from reversion.models import Version | |
from autoslug import AutoSlugField | |
from djchoices import DjangoChoices, ChoiceItem | |
from ordered_model.models import OrderedModel | |
from apps.common.models import TimeStampedModel | |
from .forms import StatusForm | |
from apps.users.models import User | |
from apps.utils import mixins, expression_handler | |
from dirtyfields import DirtyFieldsMixin | |
# functions for populating slugs | |
def slug_period_populator(instance): | |
return "%s %s" % (instance.name, instance.designator) | |
def slug_title_populator(instance): | |
return instance.title | |
# Upper case char field, for colour hex codes | |
# See http://stackoverflow.com/questions/19498740/how-can-i-make-all-charfield-in-uppercase-direct-in-model | |
class UpperCaseCharField(models.CharField): | |
def __init__(self, *args, **kwargs): | |
super(UpperCaseCharField, self).__init__(*args, **kwargs) | |
def pre_save(self, model_instance, add): | |
value = getattr(model_instance, self.attname, None) | |
if value: | |
value = value.upper() | |
setattr(model_instance, self.attname, value) | |
return value | |
else: | |
return super(UpperCaseCharField, self).pre_save(model_instance, add) | |
class PeriodSystems(DjangoChoices): | |
Calendar = ChoiceItem('MO') | |
Australian_FY = ChoiceItem('AU') | |
@staticmethod | |
def name_designators(designator): | |
if designator == PeriodSystems.Calendar: | |
return settings.MONTH_NAMES | |
else: | |
return settings.QUARTER_NAMES | |
""" | |
On saving a project, signal listeners in signals.py with create required statuses and colours | |
""" | |
class ProjectManager(models.Manager): | |
def get_default_project(self): | |
return self.order_by('order').first() | |
class Project(TimeStampedModel, OrderedModel): | |
objects = ProjectManager() | |
title = models.CharField(max_length=30, blank=False) | |
slug = AutoSlugField(populate_from=slug_title_populator, unique=True) | |
quarter_system = models.CharField(max_length=2, choices=PeriodSystems.choices, | |
validators=[PeriodSystems.validator], | |
default=PeriodSystems.Australian_FY) | |
updated_by = models.ForeignKey(User) | |
logo = models.ImageField(upload_to='logo/', blank=True, null=True) | |
def __str__(self): | |
return self.title | |
def get_logo(self): | |
if self.logo: | |
return self.logo.url | |
else: | |
return "/static/images/default_logo.png" | |
class Meta: | |
ordering = ('order',) | |
class Colour(TimeStampedModel, OrderedModel): | |
value = UpperCaseCharField(max_length=6, validators=[RegexValidator(regex="[0-9ABCDEF]{6}")]) | |
class StatusColour(Colour): | |
pass | |
class WorkStreamColourManager(models.Manager): | |
def get_next_colour(self): | |
return self.annotate(times_used=Count('workstream')).order_by('times_used', 'order')[0] | |
class WorkStreamColour(Colour): | |
objects = WorkStreamColourManager() | |
class Status(TimeStampedModel, OrderedModel): | |
title = models.CharField(max_length=30) | |
slug = AutoSlugField(populate_from=slug_title_populator, unique_with='project') | |
colour = models.ForeignKey(StatusColour) | |
project = models.ForeignKey(Project) | |
order_with_respect_to = 'project' | |
def __str__(self): | |
return self.title | |
class Meta: | |
ordering = ('order',) | |
class StrategicPriority(TimeStampedModel, OrderedModel): | |
title = models.CharField(max_length=30) | |
slug = AutoSlugField(populate_from=slug_title_populator, unique_with='project') | |
project = models.ForeignKey(Project) | |
order_with_respect_to = 'project' | |
def __str__(self): | |
return self.title | |
class Meta: | |
ordering = ('order',) | |
class WorkStreamManager(models.Manager): | |
def get_default_workstream(self, project_slug): | |
return self.filter(project__slug=project_slug).first() | |
@reversion.register(fields=['status']) | |
class WorkStream(mixins.RevisionHelperMixin, DirtyFieldsMixin, TimeStampedModel, OrderedModel): | |
ON_TRACK = 'on-track' | |
MONITOR = 'monitor' | |
STOPINTERVENE = 'stopintervene' | |
ON_TRACK_THRESHOLD = 60 | |
MONITOR_THRESHOLD = 30 | |
STOPINTERVENE_THRESHOLD = 0 | |
STATUSES = ( | |
(ON_TRACK, 'On-Track'), | |
(MONITOR, 'Monitor'), | |
(STOPINTERVENE, 'Stop/Intervene') | |
) | |
objects = WorkStreamManager() | |
project = models.ForeignKey(Project) | |
title = models.CharField(max_length=30) | |
slug = AutoSlugField(populate_from=slug_title_populator, unique=True) | |
colour = models.ForeignKey(WorkStreamColour) | |
status = models.DecimalField(default=0, max_digits=5, decimal_places=2) | |
order_with_respect_to = 'project' | |
class Meta: | |
ordering = ('order',) | |
def __str__(self): | |
return self.title | |
def save(self, *args, **kwargs): | |
if not hasattr(self, 'colour'): | |
self.colour = WorkStreamColour.objects.get_next_colour() | |
dirty_fields = self.get_dirty_fields() | |
if 'status' in dirty_fields.keys(): | |
with reversion.create_revision(): | |
super().save(*args, **kwargs) | |
else: | |
super().save(*args, **kwargs) | |
def display_status(self): | |
if self.status >= self.ON_TRACK_THRESHOLD: | |
return self.ON_TRACK | |
elif self.MONITOR_THRESHOLD <= self.status < self.ON_TRACK_THRESHOLD: | |
return self.MONITOR | |
else: | |
return self.STOPINTERVENE | |
@property | |
def previous_status(self): | |
previous_version = self.get_previous_version() | |
return previous_version.field_dict['status'] if previous_version else None | |
def set_status(self): | |
status_percent = self.get_status_percent() | |
self.status = status_percent | |
self.save() | |
def get_status_percent(self): | |
activities_amount = self.get_activities().count() | |
if activities_amount: | |
on_track_activities_amount = self.get_activities_by_status('on-track').count() | |
on_track_activities_percent = on_track_activities_amount / activities_amount * 100 | |
return on_track_activities_percent | |
return 0 | |
def get_activities(self): | |
return self.activity_set.all() | |
def get_activities_by_status(self, status): | |
activities = self.get_activities() | |
activities_by_status = activities.filter(current_status=status) | |
return activities_by_status | |
def get_activities_amount_by_status(self): | |
statuses_list = list() | |
for value, display_name in Activity.STATUSES: | |
status_dict = dict() | |
status_dict['amount'] = self.get_activities_by_status(value).count() | |
status_dict['display_name'] = display_name | |
status_dict['value'] = value | |
statuses_list.append(status_dict) | |
return statuses_list | |
def get_activities_amount(self): | |
return self.get_activities().count() | |
ACTIVITY_METRICS_TRACKED = ['status', 'current_status'] | |
class ActivityManager(models.Manager): | |
def __init__(self, *args, **kwargs): | |
self.archive = kwargs.pop('archive', False) | |
super(ActivityManager, self).__init__(*args, **kwargs) | |
def get_queryset(self): | |
if self.archive: | |
return super(ActivityManager, self).get_queryset() | |
return super(ActivityManager, self).get_queryset().filter(archive=False) | |
@reversion.register(fields=ACTIVITY_METRICS_TRACKED) | |
class Activity(mixins.RevisionHelperMixin, DirtyFieldsMixin, TimeStampedModel): | |
FIELDS_TO_CHECK = ['current_status'] | |
ON_TRACK = 'on-track' | |
MONITOR = 'monitor' | |
STOPINTERVENE = 'stopintervene' | |
PLANNED_ACTIVITY = 'planned_activity' | |
STATUSES = ( | |
(ON_TRACK, 'On-Track'), | |
(MONITOR, 'Monitor'), | |
(STOPINTERVENE, 'Stop/Intervene'), | |
(PLANNED_ACTIVITY, 'Planned Activity'), | |
) | |
title = models.CharField(max_length=30) | |
slug = AutoSlugField(populate_from=slug_title_populator, unique=True) | |
description = models.TextField() | |
work_stream = models.ForeignKey(WorkStream) | |
strategic_priorities = models.ManyToManyField(StrategicPriority, blank=True) | |
status = models.ForeignKey(Status) | |
completed = models.BooleanField(default=False) | |
contacts = models.ManyToManyField(User) | |
sponsor = models.ForeignKey(User, related_name='activities', blank=True, null=True) | |
archive = models.BooleanField(default=False) | |
current_status = models.CharField(max_length=30, default=ON_TRACK, choices=STATUSES) | |
start_at = models.DateField() | |
end_at = models.DateField() | |
objects = ActivityManager() | |
all_objects = ActivityManager(archive=True) | |
def __str__(self): | |
return self.title | |
def save(self, force_insert=False, force_update=False, using=None, | |
update_fields=None): | |
dirty_fields = self.get_dirty_fields() | |
if 'current_status' in dirty_fields.keys(): | |
with reversion.create_revision(): | |
super().save(force_insert, force_update, using, update_fields) | |
self.work_stream.set_status() | |
else: | |
super().save(force_insert, force_update, using, update_fields) | |
def get_duxeboard_url(self): | |
base_url = reverse('dashboard:project-view', kwargs={'project': self.work_stream.project.slug}) | |
react_router_url = '#/view/_all' | |
query_string = 'act[slug]={activity_slug}&act[workstream_slug]={activity_work_stream_slug}'.format( | |
activity_slug=self.slug, | |
activity_work_stream_slug=self.work_stream.slug | |
) | |
return '{base_url}{react_router_url}?{query_string}'.format(base_url=base_url, | |
react_router_url=react_router_url, | |
query_string=query_string) | |
def get_edit_url(self): | |
return reverse('cms:viewall-activity-detail', | |
kwargs={'project': self.work_stream.project.slug, 'activity': self.slug}) | |
def get_status_form(self): | |
form = StatusForm(qs=Status.objects.filter(project=self.work_stream.project), data={ | |
'status': self.status.pk | |
}) | |
return form | |
def get_revision_history(self, start=None, end=None): | |
# Mapping of revision id to metrics and their values | |
history = {} | |
for value in self.values.all(): | |
metric_version = value.get_versions(start, end) | |
for version in metric_version: | |
revision = version.revision | |
revision_id = revision.id | |
if revision_id not in history: | |
history[revision_id] = { | |
'date': revision.date_created.strftime("%Y-%m-%d %H:%M:%S %Z"), | |
'username': revision.user.username if revision.user is not None else '[System]' | |
} | |
history[revision_id][value.metric.label] = version.field_dict['value'] | |
activity_version = self.get_versions(start, end) | |
for status_version in activity_version: | |
revision = status_version.revision | |
revision_id = revision.id | |
status = Status.objects.get(id=status_version.field_dict['status_id']) | |
if revision_id not in history: | |
history[revision_id] = { | |
'date': revision.date_created.strftime("%Y-%m-%d %H:%M:%S %Z"), | |
'username': revision.user.username if revision.user is not None else '[System]' | |
} | |
history[revision_id]['status'] = status.title | |
return history | |
def get_status_colour(self): | |
return settings.ACTIVITY_DEFAULT_STATUSES[self.current_status]['COLOUR'] | |
class Unit(TimeStampedModel): | |
name = models.CharField(max_length=15) | |
prefix = models.CharField(max_length=15, null=True, blank=True) | |
suffix = models.CharField(max_length=15, null=True, blank=True) | |
def __str__(self): | |
return self.name | |
class Metric(TimeStampedModel): | |
unit = models.ForeignKey(Unit) | |
name = models.CharField(max_length=255) | |
formula = models.CharField(max_length=255, blank=True) | |
min_value = models.DecimalField(null=True, max_digits=19, decimal_places=10) | |
max_value = models.DecimalField(null=True, max_digits=19, decimal_places=10) | |
decimal_places = models.SmallIntegerField(null=True) | |
rollup_metric = models.BooleanField(default=True) | |
def __str__(self): | |
return self.name | |
@classmethod | |
def get_metrics_names(cls): | |
return [name.lower() for name in cls.objects.all().order_by('-name').values_list('name', flat=True)] | |
@classmethod | |
def get_metrics_names_and_ids(cls): | |
return {name.lower(): id for name, id in cls.objects.all().values_list('name', 'id')} | |
@reversion.register(fields=['value']) | |
class Value(mixins.RevisionHelperMixin, TimeStampedModel): | |
metric = models.ForeignKey(Metric, related_name='values') | |
activity = models.ForeignKey(Activity, related_name='values') | |
value = models.DecimalField(max_digits=19, decimal_places=10) | |
def save(self, force_insert=False, force_update=False, using=None, | |
update_fields=None): | |
with reversion.create_revision(): | |
super().save(force_insert, force_update, using, update_fields) | |
def clean(self): | |
super().clean() | |
if self.metric.max_value is not None and self.metric.max_value < self.value: | |
raise ValidationError(_('"value" field should be smaller than "max_value" field of Metric model')) | |
if self.metric.min_value is not None and self.metric.min_value > self.value: | |
raise ValidationError(_('"value" field should be greater than "min_value" field of Metric model')) | |
@property | |
def previous_value(self): | |
previous_version = self.get_previous_version() | |
return previous_version.field_dict['value'] if previous_version else None | |
class ResourceCategory(TimeStampedModel): | |
name = models.CharField(max_length=40, unique=True) | |
class Meta: | |
ordering = ('name',) | |
def __str__(self): | |
return self.name | |
class Resource(TimeStampedModel): | |
file_name = models.CharField(max_length=40) | |
resource_category = models.ForeignKey(ResourceCategory, related_name='resources') | |
file = models.FileField(upload_to='resources/') | |
activities = models.ManyToManyField(Activity, related_name='resources') | |
def __str__(self): | |
return self.file_name | |
class BaseFormulaModel(models.Model): | |
def __str__(self): | |
return '{name}={value}'.format(name=self.name, value=self.value) | |
class Meta: | |
abstract = True | |
class Variable(BaseFormulaModel): | |
name = models.CharField(max_length=30, unique=True) # TODO/FIXME | |
value = models.DecimalField(max_digits=19, decimal_places=10) | |
class VariableExpression(BaseFormulaModel): | |
expression = models.OneToOneField('core.Expression') | |
@property | |
def name(self): | |
return self.expression.name | |
@property | |
def value(self): | |
return self.expression.value | |
class Expression(BaseFormulaModel): | |
name = models.CharField(max_length=30, unique=True) | |
expression_string = models.CharField(max_length=255) | |
variables = models.ManyToManyField(Variable, related_name='variables', blank=True) | |
expressions = models.ManyToManyField(VariableExpression, related_name='variable_expressions', blank=True) | |
def save(self, force_insert=False, force_update=False, using=None, | |
update_fields=None): | |
created = True if not self.id else False | |
super().save(force_insert, force_update, using, update_fields) | |
if created: | |
VariableExpression.objects.create(expression=self) | |
@cached_property | |
def value(self): | |
variables = self.variables.all() | |
expressions = self.expressions.all() | |
e_h = expression_handler.ExpressionHandler(self.expression_string, variables, expressions) | |
value = e_h.calculate() | |
return value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from json import JSONDecodeError | |
from django.db.models import Prefetch | |
from django.core.exceptions import ObjectDoesNotExist | |
from django.core.mail import send_mail | |
from django.http import Http404 | |
from django.http import JsonResponse, HttpResponseBadRequest | |
from django.urls import reverse | |
from django.views import View | |
from django.contrib.auth.mixins import LoginRequiredMixin | |
from django.views.generic import RedirectView | |
from django.conf import settings | |
from rest_framework.views import APIView | |
from rest_framework.response import Response | |
from rest_framework import viewsets | |
from rest_framework import mixins | |
from rest_framework import permissions | |
from rest_framework import filters as rest_framework_filters | |
import django_filters | |
from apps.core.serializers import ( | |
ProjectSerializer, ProjectLogoSerializer, WorkStreamSerializer, ActivitySerializer, | |
StatusSerializer, ColourSerializer, StrategicPrioritySerializer, TimelineActivitySerializer, | |
ResourceCategorySerializer, ResourceSerializer, ActivityResourcesCategorySerializer, UnitSerializer, | |
MetricSerializer, ValueSerializer, ValueListRetrieveSerializer, VariableSerializer, ExpressionSerializer | |
) | |
from apps.core.models import ( | |
Project, WorkStream, Activity, | |
Status, Colour, WorkStreamColour, | |
StatusColour, StrategicPriority, ResourceCategory, | |
Resource, Unit, Metric, Value, Variable, Expression | |
) | |
from apps.users.models import User | |
from apps.users.serializers import UserSerializer | |
from .viewset_mixins import ScopeMixin | |
from . import filters | |
class DuxeVersionView(APIView): | |
permission_classes = (permissions.IsAuthenticated,) | |
def get(self, request): | |
version = settings.DUXE_VERSION | |
return Response({'version': version}) | |
class StatusViewSet(ScopeMixin, viewsets.ReadOnlyModelViewSet): | |
""" | |
API endpoint that allows statuses to be viewed or edited | |
""" | |
queryset = Status.objects.all() | |
queryset = StatusSerializer.setup_eager_loading(queryset) | |
serializer_class = StatusSerializer | |
lookup_field = 'slug' | |
scope_filters = { | |
'project__slug': 'project_slug' | |
} | |
class ColourViewSet(viewsets.ReadOnlyModelViewSet): | |
""" | |
API endpoint that allows colours to be viewed or edited | |
""" | |
queryset = Colour.objects.all() | |
serializer_class = ColourSerializer | |
class WorkStreamColourViewSet(viewsets.ReadOnlyModelViewSet): | |
""" | |
API endpoint that allows colours to be viewed or edited | |
""" | |
queryset = WorkStreamColour.objects.all() | |
serializer_class = ColourSerializer | |
class StatusColourViewSet(viewsets.ReadOnlyModelViewSet): | |
""" | |
API endpoint that allows colours to be viewed or edited | |
""" | |
queryset = StatusColour.objects.all() | |
serializer_class = ColourSerializer | |
class ProjectViewSet(viewsets.ReadOnlyModelViewSet): | |
""" | |
API endpoint that allows projects to be viewed or edited | |
""" | |
queryset = Project.objects.all().order_by('title') | |
serializer_class = ProjectSerializer | |
lookup_field = 'slug' | |
class ProjectLogoView(mixins.UpdateModelMixin, viewsets.GenericViewSet): | |
""" | |
API endpoint that allows projects logo to be edited | |
""" | |
queryset = Project.objects.all().order_by('title') | |
serializer_class = ProjectLogoSerializer | |
lookup_field = 'slug' | |
class WorkStreamViewSet(ScopeMixin, viewsets.ReadOnlyModelViewSet): | |
""" | |
API endpoint that allows workstreams to be viewed or edited | |
""" | |
queryset = WorkStream.objects.filter().all() | |
queryset = WorkStreamSerializer.setup_eager_loading(queryset) | |
serializer_class = WorkStreamSerializer | |
filter_class = filters.WorkStreamFilter | |
lookup_field = 'slug' | |
scope_filters = { | |
"project__slug": "project_slug" | |
} | |
class ActivityFilter(django_filters.rest_framework.FilterSet): | |
start_period = django_filters.NumberFilter(name="start_at", lookup_expr='gte', distinct=True) | |
end_period = django_filters.NumberFilter(name="end_at", lookup_expr='lte', distinct=True) | |
class Meta: | |
model = Activity | |
fields = ['start_period', 'end_period'] | |
class ActivityViewSet(ScopeMixin, viewsets.ReadOnlyModelViewSet): | |
""" | |
API endpoint that allows activities to be viewed and edited | |
""" | |
queryset = Activity.objects.all() | |
queryset = ActivitySerializer.setup_eager_loading(queryset) | |
serializer_class = ActivitySerializer | |
filter_class = filters.ActivityFilter | |
lookup_field = 'slug' | |
class ViewAllActivityViewSet(ActivityViewSet): | |
scope_filters = { | |
'work_stream__project__slug': 'project_slug' | |
} | |
class WorkStreamActivityViewSet(ActivityViewSet): | |
scope_filters = { | |
'work_stream__project__slug': 'project_slug', | |
'work_stream__slug': 'workstream_slug', | |
} | |
class StrategicPriorityViewSet(ScopeMixin, viewsets.ModelViewSet): | |
""" | |
API endpoint for viewing strategic priorities | |
""" | |
queryset = StrategicPriority.objects.all() | |
queryset = StrategicPrioritySerializer.setup_eager_loading(queryset) | |
serializer_class = StrategicPrioritySerializer | |
lookup_field = 'slug' | |
scope_filters = { | |
"project__slug": "project_slug" | |
} | |
# Users | |
class UserViewSet(viewsets.ModelViewSet): | |
""" | |
API endpoint for viewing user details | |
""" | |
queryset = User.objects.all() | |
queryset = UserSerializer.setup_eager_loading(queryset) | |
serializer_class = UserSerializer | |
class CurrentUserRedirect(LoginRequiredMixin, RedirectView): | |
permanent = False | |
def get_redirect_url(self, *args, **kwargs): | |
return reverse('api:user-detail', kwargs={ | |
'pk': self.request.user.id | |
}) | |
# Feedback | |
class ActivityFeedbackView(View, LoginRequiredMixin): | |
def post(self, request, *args, **kwargs): | |
project_slug = kwargs['project_slug'] | |
activity_slug = kwargs['activity_slug'] | |
try: | |
post_data = request.POST | |
contacts = json.loads(post_data['contacts']) | |
feedback_type = post_data['feedbackType'] | |
message = post_data['message'] | |
except (KeyError, JSONDecodeError): | |
return HttpResponseBadRequest() | |
try: | |
activity = Activity.objects.get(slug=activity_slug, work_stream__project__slug=project_slug) | |
recipients = [] | |
for username in contacts: | |
user = User.objects.get(username=username) | |
recipients.append(user.email) | |
if len(recipients) is 0: | |
raise ValueError('Need at least 1 recipient') | |
except ObjectDoesNotExist: | |
raise Http404() | |
except ValueError: | |
return HttpResponseBadRequest() | |
subject = '[Duxe] [{type}] on Activity "{title}"'.format(type=feedback_type, title=activity.title) | |
message = 'The following message was entered on the feedback form for the activity "{activity}":' \ | |
'\n---\n{message}\n---\n'.format(activity=activity.title, message=message) | |
message += 'View this activity via this link: {scheme}://{base_url}{url}'.format(scheme=request.scheme, | |
base_url=request.get_host(), | |
url=activity.get_duxeboard_url()) | |
from_email = request.user.email | |
success = send_mail(subject, message, from_email, recipients, fail_silently=False) | |
return JsonResponse({"success": success}) | |
class TimelineActivityViewSet(viewsets.ReadOnlyModelViewSet): | |
queryset = Activity.objects.all()\ | |
.select_related('sponsor', 'work_stream', 'work_stream__project')\ | |
.prefetch_related('contacts', 'values', 'values__metric', 'values__metric__unit')\ | |
serializer_class = TimelineActivitySerializer | |
filter_class = filters.ActivityFilter | |
lookup_field = 'slug' | |
def get_queryset(self): | |
project_slug = self.kwargs.get('project_slug') | |
if project_slug: | |
return self.queryset.filter(work_stream__project__slug=project_slug) | |
return self.queryset | |
class ResourceCategoryViewSet(viewsets.ModelViewSet): | |
queryset = ResourceCategory.objects.all().prefetch_related('resources') | |
serializer_class = ResourceCategorySerializer | |
class ResourceViewSet(viewsets.ModelViewSet): | |
queryset = Resource.objects.all().prefetch_related('activities') | |
serializer_class = ResourceSerializer | |
class ActivityResourceViewSet(mixins.ListModelMixin, viewsets.GenericViewSet): | |
serializer_class = ActivityResourcesCategorySerializer | |
def get_queryset(self): | |
activity_slug = self.kwargs.get('activity_slug') | |
return ResourceCategory.objects.all().prefetch_related( | |
Prefetch( | |
'resources', | |
queryset=Resource.objects.filter(activities__slug=activity_slug) | |
) | |
) | |
class UnitViewSet(viewsets.ModelViewSet): | |
queryset = Unit.objects.all() | |
serializer_class = UnitSerializer | |
class MetricViewSet(viewsets.ModelViewSet): | |
queryset = Metric.objects.all() | |
serializer_class = MetricSerializer | |
class ValueViewSet(viewsets.ModelViewSet): | |
queryset = Value.objects.all().select_related('metric', 'metric__unit') | |
def update(self, request, *args, **kwargs): | |
super().update(request, *args, **kwargs) | |
instance = self.get_object() | |
serializer = ValueListRetrieveSerializer(instance) | |
return Response(serializer.data) | |
def get_serializer_class(self): | |
if self.action == 'list' or self.action == 'retrieve': | |
return ValueListRetrieveSerializer | |
return ValueSerializer | |
class VariableViewSet(viewsets.ModelViewSet): | |
queryset = Variable.objects.all() | |
serializer_class = VariableSerializer | |
class ExpressionViewSet(viewsets.ModelViewSet): | |
queryset = Expression.objects.all().prefetch_related('variables', 'expressions', 'expressions__expression') | |
serializer_class = ExpressionSerializer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment