Skip to content

Instantly share code, notes, and snippets.

@netpastor
Created September 3, 2019 13:10
Show Gist options
  • Save netpastor/b0549342d880c1147e13bd6f5ed200cd to your computer and use it in GitHub Desktop.
Save netpastor/b0549342d880c1147e13bd6f5ed200cd to your computer and use it in GitHub Desktop.
# -*- coding: UTF-8 -*-
import logging
from datetime import datetime, timedelta, date
import pytz
from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models import Count, Sum
from django.db.models.expressions import Case, When, F, Value
from django_encrypted_password_field import EncryptedPasswordField
from django_mysql.models import QuerySetMixin
from memoize import memoize
from pytz.exceptions import UnknownTimeZoneError
from solo.models import SingletonModel
logger = logging.getLogger('django')
product_names = ['display', 'dmp', 'content', 'maxifier', 'video']
class AbstractUsageUnitQuerySet(models.QuerySet):
def get_slugs(self):
return self.values_list('slug', flat=True)
def get_slugs_with_titles(self):
return self.values_list('slug', 'title').order_by('title')
def get_slugs_and_titles(self):
slugs, titles = zip(*self.get_slugs_with_titles())
return list(slugs), list(titles)
def get_slug_to_id_mapping(self):
return {u['slug']: u['id'] for u in self.values('slug', 'id')}
class BaseUsageUnitManager(models.Manager):
def get_queryset(self):
qs = AbstractUsageUnitQuerySet(self.model, using=self._db)
if self.model.PRODUCT:
qs = qs.filter(product=self.model.PRODUCT)
return qs
class UsageUnit(models.Model):
PRODUCT = None
slug = models.SlugField('slug', unique=True)
title = models.CharField('title', max_length=50)
note = models.TextField('note', blank=True)
uom = models.CharField('uom', max_length=50, blank=True, null=True)
product = models.CharField('product', max_length=50, blank=True, null=True)
objects = BaseUsageUnitManager.from_queryset(AbstractUsageUnitQuerySet)()
class Meta:
app_label = 'billing'
verbose_name = 'billing usage unit'
verbose_name_plural = 'billing usage units'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.product = self.PRODUCT
def __str__(self):
return format(self.title)
@classmethod
def hour_unit(cls):
return cls.objects.filter(slug='hour').first()
def get_uom(self):
return self.uom if self.uom else 'Hours' if self.slug == 'hour' else 'Impressions'
class Settings(SingletonModel):
cxense_login = models.CharField('Cxense login', max_length=50, blank=True)
cxense_secret = EncryptedPasswordField('Cxense secret', max_length=50, blank=True)
class Meta:
verbose_name = 'Settings'
def __str__(self):
return 'Settings'
class TzOverride(models.Model):
tz = models.CharField('Customer site tz', max_length=20, unique=True)
tz_override = models.CharField('Overrided tz', max_length=20)
comment = models.CharField('Comment', max_length=255, blank=True)
class Meta:
app_label = 'billing'
verbose_name = 'customer timezone'
verbose_name_plural = 'customer timezones'
def __str__(self):
return '{0.tz} -> {0.tz_override}'.format(self)
def clean(self):
try:
pytz.timezone(self.tz_override)
except UnknownTimeZoneError:
raise ValidationError('{} is wrong timezone'.format(self.tz_override))
@classmethod
@memoize()
def get_timezone(cls, timezone):
try:
return pytz.timezone(timezone)
except UnknownTimeZoneError:
try:
tz_over = cls.objects.get(tz=timezone)
return pytz.timezone(tz_over.tz_override)
except cls.DoesNotExist:
raise ValueError('Timezone "{}" override does not exist'.format(timezone))
except UnknownTimeZoneError:
raise ValueError('Timezone override "{}" is wrong timezone'.format(tz_over.tz_override))
class TaskLog(models.Model):
name = models.CharField('task name', max_length=255)
ymd = models.DateTimeField('task date')
log = models.TextField('log')
class Meta:
verbose_name = 'task log'
verbose_name_plural = 'task logs'
ordering = ['ymd']
def __str__(self):
return '{} - {}'.format(self.ymd, self.name)
class CxenseUser(models.Model):
ROLE_DEFAULT = 'default'
ROLE_UI_USAGE = 'ui_usage'
ROLES = (
(ROLE_DEFAULT, 'Default'),
(ROLE_UI_USAGE, 'UI Usage'),
)
username = models.CharField('Username', max_length=255)
api_key = EncryptedPasswordField('Api key', max_length=255)
is_admin = models.BooleanField('Admin', default=False)
role = models.CharField(max_length=255, choices=ROLES, unique=True)
class Meta:
verbose_name = 'Cxense user'
def __str__(self):
return self.username
class CxenseCustomerQuerySet(models.QuerySet):
def with_cx_id(self):
return self.exclude(cx_id='').exclude(cx_id__isnull=True)
def get_archived_customers_ids(self):
return list(self.filter(archived=True).values_list('cx_id', flat=True))
class CxenseCustomer(models.Model):
cx_id = models.CharField('cxense id', unique=True, null=True, blank=True, max_length=40)
name = models.CharField('name', max_length=255)
sales_force_id = models.CharField('salesforce id', max_length=50, blank=True, default='', db_index=True)
zuora_id = models.CharField('zuora id', max_length=50, blank=True, db_index=True)
manual = models.BooleanField('manual', default=False)
parent = models.ForeignKey('self', null=True, blank=True, on_delete=models.CASCADE)
description = models.CharField('description', max_length=255, blank=True)
lts_days = models.PositiveIntegerField(null=True, blank=True)
use_personification = models.BooleanField('person-n', default=True)
feature_dmp = models.BooleanField(default=False)
feature_content = models.BooleanField(default=False)
feature_cce = models.BooleanField(default=False)
feature_search = models.BooleanField(default=False)
archived = models.BooleanField(default=False)
exclude_from_zuora_upload_dmp = models.BooleanField('Exclude from Zuora upload (DMP)', default=False)
exclude_from_zuora_upload_content = models.BooleanField('Exclude from Zuora upload (Content)', default=False)
objects = CxenseCustomerQuerySet.as_manager()
class Meta:
verbose_name = 'Cxense customer'
verbose_name_plural = 'Cxense customers'
def __str__(self):
return self.name
def get_sitegroups(self):
return self.site_groups.all()
def get_sitegroups_ids(self):
return list(filter(None, self.site_groups.values_list('group_id', flat=True)))
def get_sites(self):
return self.sites.all()
def get_sites_ids(self):
return list(filter(None, self.sites.values_list('site_id', flat=True).distinct()))
def get_widgets_ids(self):
return list(filter(None, self.site_groups.values_list('widgets', flat=True).distinct()))
class AbstractUsageRecordQuerySet(QuerySetMixin, models.QuerySet):
def annotate_usage_unit_qty_sum(self, fieldname, unit_slug):
annotation = Sum(
Case(
When(
unit__slug=unit_slug,
then=F('qty')
),
default=Value(0),
output_field=models.IntegerField()
)
)
qs = self.annotate(**{fieldname: annotation})
return qs
def annotate_usage_units_qty_sum(self, *args, **kwargs):
if args and kwargs:
raise ValueError
qs = self
if args:
for slug in args:
qs = qs.annotate_usage_unit_qty_sum(fieldname=slug, unit_slug=slug)
if kwargs:
for fieldname, slug in kwargs.items():
qs = qs.annotate_usage_unit_qty_sum(fieldname=fieldname, unit_slug=slug)
return qs
def large_delete(self, delete_date: date, customers):
for customer in customers:
self.filter(date=delete_date, customer_id=customer.id).fast_delete()
def fast_delete(self):
return self._raw_delete(self.db)
class AbstractUsageRecord(models.Model):
unit = models.ForeignKey(UsageUnit, related_name='%(class)ss', verbose_name='usage unit', db_index=True, on_delete=models.CASCADE)
date = models.DateField('date', db_index=True)
zuora_id = models.CharField('Zuora ID', max_length=50, blank=True, db_index=True)
qty = models.BigIntegerField('quantity', default=0)
note = models.TextField('note', blank=True)
objects = AbstractUsageRecordQuerySet.as_manager()
class Meta:
abstract = True
@classmethod
def get_usage_cal(cls):
year = datetime.now() - timedelta(days=366)
return cls.objects. \
filter(date__gte=year). \
values('date'). \
annotate(value=Count('date')). \
order_by('date'). \
values_list('date', 'value')
@classmethod
def get_sum_usage(cls, start_date, end_date, raw=False):
raise NotImplementedError
@classmethod
def get_subscription_charges(cls, start_date: date, end_date: date, batches=None):
raise NotImplementedError
class HardwareUsageRecord(models.Model):
UNIT_CPU_MS = 'apiServerCpuMs'
UNIT_BACKEND_COST = 'backendCost'
UNIT_COUNT = 'count'
UNIT_LATENCY = 'latency'
UNIT_CHOICES = (
(UNIT_CPU_MS, UNIT_CPU_MS),
(UNIT_BACKEND_COST, UNIT_BACKEND_COST),
(UNIT_COUNT, UNIT_COUNT),
(UNIT_LATENCY, UNIT_LATENCY),
)
UNITS = tuple(u[0] for u in UNIT_CHOICES)
customer = models.ForeignKey(CxenseCustomer, related_name='hardware_usage_records', on_delete=models.CASCADE)
unit = models.CharField(max_length=100, choices=UNIT_CHOICES, db_index=True)
month = models.PositiveSmallIntegerField()
year = models.PositiveSmallIntegerField()
qty = models.BigIntegerField('quantity', default=0)
class Meta:
unique_together = [('customer', 'unit', 'month', 'year')]
class GraphiteSettings(SingletonModel):
server = models.CharField('Server address', max_length=256)
port = models.PositiveIntegerField('Server port', default=2003)
class UIUsageForCustomerSite(models.Model):
PRODUCTS = (
# (site_id, name)
('1139620337853314232', 'DMP New'),
('9222354774360837876', 'DMP Old'),
('9222333114891684880', 'Content'),
('9222263900732340922', 'Insight'),
)
SITE_IDS = [p[0] for p in PRODUCTS]
customer = models.ForeignKey(CxenseCustomer, on_delete=models.CASCADE)
site_id = models.CharField(max_length=19, choices=PRODUCTS)
date = models.DateField(db_index=True)
active_users = models.PositiveIntegerField()
user_sessions = models.PositiveIntegerField()
user_page_views = models.PositiveIntegerField()
class Meta:
index_together = [('customer', 'date')]
unique_together = [('customer', 'site_id', 'date')]
class UIUsageForCustomer(models.Model):
customer = models.ForeignKey(CxenseCustomer, on_delete=models.CASCADE)
date = models.DateField(db_index=True)
active_users = models.PositiveIntegerField()
user_sessions = models.PositiveIntegerField()
user_page_views = models.PositiveIntegerField()
class Meta:
unique_together = [('customer', 'date')]
class CustomerAggregatedData(models.Model):
cx_id = models.CharField(max_length=255)
name = models.CharField(max_length=255)
zuora_id = models.CharField(max_length=255)
sales_force_id = models.CharField(max_length=255)
country = models.CharField(max_length=255)
feature_dmp = models.BooleanField()
feature_content = models.BooleanField()
feature_cce = models.BooleanField()
feature_search = models.BooleanField()
lts_days = models.PositiveIntegerField()
active_users = models.BigIntegerField()
users_sessions = models.BigIntegerField()
users_page_views = models.BigIntegerField()
page_views = models.BigIntegerField()
dmp_events = models.BigIntegerField()
content_imps = models.BigIntegerField()
cce_imps = models.BigIntegerField()
backend_cost = models.BigIntegerField()
api_server_cpu_ms = models.BigIntegerField()
latency = models.BigIntegerField()
requests_number = models.BigIntegerField()
sitegroups_number = models.PositiveIntegerField()
sites_number = models.PositiveIntegerField()
widgets_number = models.PositiveIntegerField()
search_queries = models.PositiveIntegerField()
invoiced_ytd = models.DecimalField(decimal_places=2, max_digits=20)
invoiced_avg_monthly = models.DecimalField(decimal_places=2, max_digits=20)
month = models.PositiveSmallIntegerField()
year = models.PositiveSmallIntegerField()
date = models.DateField(db_index=True) # for range filter
class Meta:
unique_together = (
('cx_id', 'year', 'month'),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment