Skip to content

Instantly share code, notes, and snippets.

@dougharris
Created October 25, 2022 16:46
Show Gist options
  • Save dougharris/5b867befa003a5f7de87ffc5b15333c7 to your computer and use it in GitHub Desktop.
Save dougharris/5b867befa003a5f7de87ffc5b15333c7 to your computer and use it in GitHub Desktop.
import logging
import re
from collections import OrderedDict
from datetime import date, timedelta
from django import forms
from django.conf import settings
from django.contrib import admin, messages
from django.contrib.admin.models import DELETION, LogEntry
from django.contrib.auth import get_permission_codename
from django.contrib.auth.admin import UserAdmin as OrigUserAdmin
from django.contrib.contenttypes.models import ContentType
from django.core import validators
from django.core.exceptions import PermissionDenied, ValidationError
from django.db import models
from django.db.models import Count, Q
from django.db.models.functions import TruncDate
from django.http import HttpResponseRedirect
from django.shortcuts import redirect, render
from django.urls import path, reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from bs4 import BeautifulSoup
from rest_framework.authtoken.models import Token
from ti_service.models import ProfileAttributes
from myapp.models.fields import LegacyCompatibleTextField
from myapp.models.user import (
CLIENT_SUPPORT_GROUP_NAME,
EXADMIN_GROUP_NAME,
USER_SUPPORT_GROUP_NAME,
)
from myapp.tasks import (
maybe_sync_object_to_service,
sync_object_to_email_provider_task,
unsubscribe_object_from_email_provider_lists_task,
)
from .mailinglist_utils import get_mailinglist_providers
from .models import (
ClassAAO,
ClassAA,
ClassAM,
ClassACT,
ClassBP,
ClassEPN,
ClassFA,
ClassGS,
ClassPAK,
ClassPC,
ClassPHRP,
ClassPP,
ClassPU,
ClassPZ,
ClassP,
ClasPER,
ClassRR,
ClassT,
ClassTC,
ClassV,
)
from .models.vendor_task_report import GPGConfiguration, VendorReportSFTPEndpoint
logger = logging.getLogger(__name__)
class ReadOnlyAdminMixin:
def get_readonly_fields(self, request, obj=None):
"""To keep us from messing up the data, events should be ALL read only
... warning::
If you don't make `popcorn` read-only, Django's gonna try to make a
dropdown with ALL popcorn records in it, which takes a LONG time.
"""
return [f.name for f in self.model._meta.fields]
class UserInline(admin.StackedInline):
model = ClassPU
extra = 0
fields = ('external_id', )
formfield_overrides = {
models.TextField: {
'widget': forms.Textarea(attrs={'rows': 1}),
},
}
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
class FormAnswerInline(admin.StackedInline):
model = ClassFA
extra = 0
group_readonly_fields = OrderedDict([
(USER_SUPPORT_GROUP_NAME, (
'form_question',
'form_question_option',
'answer_text',
))
])
group_fields = OrderedDict([(
USER_SUPPORT_GROUP_NAME,
group_readonly_fields[USER_SUPPORT_GROUP_NAME],
)])
formfield_overrides = {
LegacyCompatibleTextField: {
'widget': forms.Textarea(attrs={
'rows': 2, 'cols': 80
})
},
}
def get_test_account_query(field_namespace=None):
"""Get test account query with optional field namespace.
Ex. if field_namespace is 'popcorn', lookup 'popcorn__email__endswith' instead of the default
'email__endswith'
"""
def _Q(**kwargs):
nonlocal field_namespace
if field_namespace:
namespace = f'{field_namespace}__'
else:
namespace = ''
new_kwargs = {f'{namespace}{k}': v for k, v in kwargs.items()}
return Q(**new_kwargs)
email_queries = None
for domain in settings.ADMIN_TEST_USER_EMAIL_DOMAINS:
single_email_query = _Q(email__endswith=domain)
if email_queries:
email_queries = email_queries | single_email_query
else:
email_queries = single_email_query
# yapf: disable
test_account_query = (email_queries
|
_Q(signup_ip__in=settings.ADMIN_TEST_USER_SIGNUP_IPS)
|
_Q(username__startswith='test'))
# yapf: enable
return test_account_query
class TestAccountStatusFilter(admin.SimpleListFilter):
"""Provide list filtering using our test account heuristic, and default to hiding test users."""
title = 'test account status'
parameter_name = 'test_account_status'
def lookups(self, request, module_admin):
"""Hide test accounts by default."""
return (
(None, 'Hide test accounts'),
('All', 'Show all account types'),
('Only', 'Show only test accounts'),
)
def choices(self, changelist):
"""Remove the All option from the display and substitute our own Hide option."""
return [{
'selected': self.value() is None,
'query_string': changelist.get_query_string(remove=[self.parameter_name]),
'display': 'Hide test accounts'
},
{
'selected': self.value() == 'Only',
'query_string': changelist.get_query_string({self.parameter_name: 'Only'}),
'display': 'Show only test accounts',
},
{
'selected': self.value() == 'All',
'query_string': changelist.get_query_string({self.parameter_name: 'All'}),
'display': 'Show all account types',
}]
def queryset(self, request, queryset):
"""Hide test accounts by default."""
value = self.value()
if value is None:
return queryset.filter(~get_test_account_query())
elif value == 'Only':
return queryset.filter(get_test_account_query())
return queryset
class DisabledAccountFilter(admin.SimpleListFilter):
"""Allow filtering by account disabled status, and default to hiding disabled accounts."""
title = 'disabled status'
parameter_name = 'disabled'
def lookups(self, request, module_admin):
"""Hide test accounts by default."""
return (
(None, 'Hide disabled accounts'),
('All', 'Show all account types'),
('Only', 'Show only disabled accounts'),
)
def choices(self, changelist):
"""Remove the All option from the display and substitute our own Hide option."""
return [{
'selected': self.value() is None,
'query_string': changelist.get_query_string(remove=[self.parameter_name]),
'display': 'Hide disabled accounts'
},
{
'selected': self.value() == 'Only',
'query_string': changelist.get_query_string({self.parameter_name: 'Only'}),
'display': 'Show only disabled accounts',
},
{
'selected': self.value() == 'All',
'query_string': changelist.get_query_string({self.parameter_name: 'All'}),
'display': 'Show all account types',
}]
def queryset(self, request, queryset):
"""Hide disabled accounts by default."""
value = self.value()
if value is None:
return queryset.exclude(disabled=True)
elif value == 'Only':
return queryset.filter(disabled=True)
return queryset
class ActualPCFilter(admin.SimpleListFilter):
"""Allow filtering by only actual vendors. Don't include the extra companies."""
title = 'Vendors'
parameter_name = 'vendor'
def lookups(self, request, module_admin):
"""Show popcorns associated with any (or no) vendor by default."""
queryset = ClassPC.objects.filter(is_vendor=True).order_by('name')
choices = [(p.candidate_id, p.name) for p in queryset]
return choices
def queryset(self, request, queryset):
"""Show popcorns associated with any (or no) vendor by default."""
value = self.value()
if value is not None:
return queryset.filter(chosen_vendor=value)
return queryset
class ClassPAdmin(admin.ModelAdmin):
EMAIL_DOMAIN_WHITELIST = (
'yahoo.com',
'gmail.com',
'hotmail.com',
'aol.com',
'live.com',
'comcast.net',
'ymail.com',
'msn.com',
'sbcglobal.net',
'ihavenet.com',
'mail.com',
'verizon.net',
'rocketmail.com',
'att.net',
'c895fm.com',
'u2audio.com',
'bellsouth.net',
'cox.net',
'techemail.com',
'dcemail.com',
'gmx.com',
)
ACTIVATED = 4
DEACTIVATED = 5
BANNED = 6
VIEWED = 7
EMAIL_PROVIDER_FIELDS = (
'email_provider_id',
'email_provider_contact_digest',
'email_provider_contact_sync_time',
)
change_form_template = 'admin/change_object_form.html'
exclude = ('account_annotation', )
readonly_fields = (
'vendor_product_field',
'popcorn_id',
'secure_proxy_email_address',
'signup_date',
'is_quitdate_invalid',
'signup_user_agent',
'last_login_date',
'ning_id',
'signup_ip',
'old_id',
'hipaa_release_signed',
'hipaa_release_revision',
'eligibility_last_checked',
'enrollment_last_checked',
'previous_vendor',
'account_status',
'external_vendor_id',
)
def get_fieldsets(self, request, obj=None):
# if the AC fields are in the fields a user has access to, show them
if self.EMAIL_PROVIDER_FIELDS[0] in self.get_fields(request):
fieldsets = (
(
'Email Provider',
{
'classes': ('collapse', ),
'fields': self.EMAIL_PROVIDER_FIELDS,
'description': "These fields are used to manage active campaign data. "
"They are set by code and so are read-only."
}
),
)
return fieldsets
else:
return super().get_fieldsets(request, obj=obj)
list_display = (
'popcorn_id',
'username',
'get_email',
'get_mobile_number',
'chosen_vendor',
'vendor_product_field',
'_is_sponsored',
'account_status',
)
list_filter = (TestAccountStatusFilter, DisabledAccountFilter, ActualVendorFilter)
list_display_links = (
'popcorn_id',
'username',
'get_email',
)
search_fields = (
'popcorn_id',
'username',
'email',
'mobile_number',
'legal_first_name',
'legal_last_name',
)
inlines = [
UserInline,
FormAnswerInline,
]
def __init__(self, model, admin_site):
self._whitelisted_vendor_domains = None
super().__init__(model, admin_site)
def _is_sponsored(self, obj):
value = obj.is_sponsored
if value is None:
value = False
return value
_is_sponsored.boolean = True
def get_form(self, request, obj=None, change=False, **kwargs):
"""Use the initial obj to determine what the validators should be on the username
field, to validate new usernames but keep support for old restrictions on pre-existing."""
form = super().get_form(request, obj=obj, change=change, **kwargs)
try:
username_field = form.base_fields['username']
except KeyError:
# ClassPU does not have permission to edit username field so validation
# does not apply.
return form
username_field.validators = [
validators.MaxLengthValidator(15),
validators.MinLengthValidator(3),
validators.RegexValidator(
re.compile(r'^[-a-zA-Z0-9_]+\Z'),
_(
"Enter a valid username consisting of letters, numbers, "
"underscores, or hyphens."
),
)
]
if obj and obj.username:
is_legacy_format = len(obj.username) > 15 or '.' in obj.username
if is_legacy_format:
username_field.validators = [
validators.MaxLengthValidator(30),
validators.MinLengthValidator(3),
validators.RegexValidator(
re.compile(r'^[-a-zA-Z0-9_\.]+\Z'),
_(
"Enter a valid username consisting of letters, numbers, "
"underscores, hyphens, or periods."
),
)
]
return form
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path(
'<int:pk>/activate/',
self.admin_site.admin_view(self.activate_object_view),
name='activate_object'
),
path(
'<int:pk>/deactivate/',
self.admin_site.admin_view(self.deactivate_object_view),
name='deactivate_object'
),
]
return custom_urls + urls
def changeform_view(self, request, object_id=None, form_url='', extra_context=None):
# Display account status alert if disabled
extra_context = extra_context or {}
try:
popcorn = ClassP.objects.get(pk=object_id)
except ClassP.DoesNotExist:
# Adding a new ClassP, skip ahead to super().
pass
else:
# Set request context for VendorCandidateAdmin permissions
request._updating_object = object_id is not None
if request.user.has_perm('myapp.view_object') or \
request.user.has_perm('myapp.view_taskcompletion'):
if popcorn.chosen_vendor.uses_points_achievement:
extra_context['uses_points'] = True
ctx = self.program_completion_points_based(popcorn)
else:
extra_context['uses_points'] = False
ctx = self.program_completion_task_based(popcorn)
extra_context.update(ctx)
if request.user.has_perm('myapp.view_object'):
extra_context['chat_history'] = popcorn.event_records.filter(
popcorn=popcorn, event_tag=ClasPER.EventTypes.CHATTED_WITH_COACH
).order_by('-event_time')[:10]
if request.method == 'GET':
self._applog_object_action(popcorn, request, "viewed")
self._create_object_logentry(
popcorn,
request.user,
PopcornAdmin.VIEWED,
message=f"[viewed from {request.META['REMOTE_ADDR']}]"
)
response = super(PopcornAdmin, self).changeform_view(
request,
object_id=object_id,
form_url=form_url,
extra_context=extra_context,
)
self._whitelisted_vendor_domains = None
return response
def program_completion_task_based(self, popcorn):
"""
Returns extra template context for users with task completion-based program
"""
ctx = {}
required_tasks = ClassT.objects.filter(required_for_plan_completion=True)
popcorn_required_tasks = []
cached_task_ids = popcorn.current_completed_tasks_pks
completed_tasks = popcorn.completed_tasks
for task in required_tasks:
completed = task.task_id in cached_task_ids
resets_each_year = task.resets_at_benefit_period_end
if completed:
completed_date = completed_tasks.filter(
task_id=task.task_id
).latest('completion_date').completion_date
else:
completed_date = ''
popcorn_required_tasks.append({
'name': task.task_description,
'completed': completed,
'completed_date': completed_date,
'resets': resets_each_year,
})
ctx['required_tasks'] = popcorn_required_tasks
if popcorn.get_has_completed_program():
ctx['plan_completed'] = True
other_completions = []
for task_completion in completed_tasks.exclude(
task__required_for_plan_completion=True
):
other_completions.append({
'name': task_completion.task.task_description,
'current_year': task_completion in popcorn.current_completed_tasks.all(),
'completed_date': task_completion.completion_date,
'resets': task_completion.task.resets_at_benefit_period_end,
})
ctx['other_completions'] = other_completions
return ctx
def program_completion_points_based(self, popcorn):
"""
Returns extra template context for users with points-based program
"""
activities = ClassACT.objects.all()
for activity in activities:
records = popcorn.event_records
return {
'plan_completed': True,
'activities': activities,
}
def save_form(self, request, form, change):
"""
Send a report to Omniture if any fields it cares about have changed, or a new
ClassP was created.
"""
popcorn = super().save_form(request, form, change)
if not change or any(f in ClassP.OMNITURE_TRACKED_FIELDS for f in form.changed_data):
popcorn.send_omniture_report()
return popcorn
def save_model(self, request, obj, form, change):
"""
* Sync popcorn to Email Provider if they were opted in to email or unsubscribe
them if they are newly opted out.
* Updates existing Service users if the 'chosen_vendor' changed.
"""
super().save_model(request, obj, form, change)
if obj.email_optin and 'email_provider' in get_mailinglist_providers(obj):
# Sync new and existing contacts in Email Provider to keep contacts up to date.
# Subscribe to the default list(s) and tag(s) if they are newly opted in.
if not change or 'email_optin' in form.changed_data:
sync_object_to_email_provider_task.delay(
obj.popcorn_id,
list_ids=settings.EMAIL_PROVIDER_LIST_IDS,
tag_ids=settings.EMAIL_PROVIDER_TAG_IDS,
)
self._email_optin_task_completion(obj)
else:
sync_object_to_email_provider_task.delay(obj.popcorn_id)
if not obj.email_optin and 'email_optin' in form.changed_data:
ClasPER.record_email_opt_out(obj, 'Admin')
if obj.email_provider_id:
unsubscribe_object_from_email_provider_lists_task.delay(obj.popcorn_id)
if change and obj.service_id:
attributes = []
if 'chosen_vendor' in form.changed_data:
attributes += [
ProfileAttributes.COMPANY_NAME,
ProfileAttributes.NRT,
ProfileAttributes.SPONSORED_STATUS,
]
if 'quitdate' in form.changed_data:
attributes += [
ProfileAttributes.QUITDATE,
]
maybe_sync_object_to_service(obj, attributes)
def _email_optin_task_completion(self, popcorn):
ClasPER.record_email_opt_in(
popcorn=popcorn,
location='Admin',
)
email_optin_task = ClassT.objects.filter(task_name=ClasPER.EventTypes.EMAIL_OPT_IN
).first()
if email_optin_task:
ClassTC.record_task_completion(
popcorn=popcorn,
task=email_optin_task,
subtasks=[],
event_source=ClasPER.EventSources.HTTP_API,
)
def get_readonly_fields(self, request, obj=None):
"""
Set is_data_phi to be read-only if it is True.
Admins are allowed to set a popcorn's health information to be protected,
but setting health information to be unprotected is a dangerous change,
so it is not allowed.
"""
readonly_fields = super().get_readonly_fields(request, obj=obj)
if obj and obj.is_data_phi is True:
readonly_fields += ("is_data_phi", )
readonly_fields += self.EMAIL_PROVIDER_FIELDS
return readonly_fields
def log_addition(self, request, object, message):
try:
message = f"{message} [added from {request.META['REMOTE_ADDR']}]"
except KeyError:
message = f"{message} [added from undetermined IP address]"
self._applog_object_action(object, request, "added")
super().log_addition(request, object, message)
def log_change(self, request, object, message):
try:
message = f"{message} [changed from {request.META['REMOTE_ADDR']}]"
except KeyError:
message = f"{message} [changed from undetermined IP address]"
self._applog_object_action(object, request, "changed")
super().log_change(request, object, message)
def log_deletion(self, request, object, object_repr):
"""
Log that an object will be deleted.
This does _not_ call its superclass's version because the default version does not
included the change message.
"""
message = f"[deleted from {request.META['REMOTE_ADDR']}]"
popcorn_type = ContentType.objects.get(app_label='plan', model='popcorn')
self._applog_object_action(object, request, "deleted")
return LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=popcorn_type.id,
object_id=object.pk,
object_repr=object_repr,
action_flag=DELETION,
change_message=message,
)
@property
def whitelisted_vendor_domains(self):
if not self._whitelisted_vendor_domains:
self._whitelisted_vendor_domains = list(
ClassPC.objects.values_list('email_domain', flat=True)
)
return self._whitelisted_vendor_domains
def external_vendor_id(self, obj):
ext_id = obj.user.external_id or ""
if obj.chosen_vendor and obj.chosen_vendor.member_id_label:
ext_id = f'{ext_id} ({obj.chosen_vendor.member_id_label})'
return ext_id
external_vendor_id.short_description = 'External ID'
def vendor_product_field(self, obj):
return obj.vendor_product
vendor_product_field.short_description = 'Product'
def get_email(self, obj):
if not obj.email or not obj.is_data_phi:
return obj.email
# Obscure first part of email
username, domain, *_ = obj.email.split("@")
obscured_username = username[:1] + len(username[1:]) * "*"
# If domain is in whitelist, return domain without obscuring
if domain in self.EMAIL_DOMAIN_WHITELIST or domain in self.whitelisted_vendor_domains:
return f'{obscured_username}@{domain}'
# If "." character is not in the domain string, leave it as it is
# (last part of the domain is not obscured)
if "." not in domain:
return f'{obscured_username}@{domain}'
# Obscure email domain
obscured_domains, last_domain = domain.rsplit(".", 1)
obscured_domains = (
obscured_domains[:1] + len(obscured_domains[1:-1]) * "*" + obscured_domains[-1]
)
return f"{obscured_username}@{obscured_domains}.{last_domain}"
def get_mobile_number(self, obj):
if not obj.mobile_number or not obj.is_data_phi:
return obj.mobile_number
number_parts = tuple(re.findall(r'\d{4}$|\d{3}', obj.mobile_number))
return f"***-***-{number_parts[-1]}"
def get_queryset(self, request):
if request.path == reverse('admin:plan_object_changelist'):
show_results = False
allowed_filter_names = []
for _filter in self.list_filter:
try:
allowed_filter_names.append(_filter.parameter_name)
except AttributeError:
allowed_filter_names.append(_filter)
allowed_filters = '|'.join(allowed_filter_names)
filter_pattern = re.compile(fr'^({allowed_filters})__\w*exact$')
for k, v in request.GET.items():
if v and (k == 'q' or filter_pattern.match(k)):
show_results = True
if not show_results:
return ClassP.objects.none()
return super(PopcornAdmin, self).get_queryset(request).select_related('chosen_vendor')
get_email.short_description = "Email"
get_mobile_number.short_description = "Mobile Number"
def _is_test_account(self, popcorn):
# yapf: disable
return (
(popcorn.email and
any([popcorn.email.endswith(d) for d in settings.ADMIN_TEST_USER_EMAIL_DOMAINS])) or
(popcorn.signup_ip and
popcorn.signup_ip in settings.ADMIN_TEST_USER_SIGNUP_IPS) or
popcorn.username.startswith('test')
)
# yapf: enable
def _applog_object_action(self, popcorn, request, action_name):
"""For compliance reasons, we want to know who has accessed popcorn data in app logs"""
user = request.user
ip_addr = request.META.get('REMOTE_ADDR', "undetermined IP address")
user_groups = ', '.join(user.groups.values_list('name', flat=True)) or "no groups"
if settings.ALWAYS_LOG_ADMIN_ACTIVITY or not self._is_test_account(popcorn):
logger.info(
f"POPCORN ACCESS: {user.username} in groups {user_groups} {action_name} "
f"popcorn {popcorn.pk} from IP {ip_addr}"
)
def _create_object_logentry(self, popcorn, user, action, message):
popcorn_type = ContentType.objects.get(app_label='plan', model='popcorn')
LogEntry.objects.create(
user=user,
content_type=popcorn_type,
object_id=popcorn.pk,
object_repr=repr(popcorn),
action_flag=action,
change_message=message
)
def _get_account_status_logentry(self, popcorn, actions=[]):
popcorn_type = ContentType.objects.get(app_label='plan', model='popcorn')
entry = LogEntry.objects.filter(
content_type=popcorn_type, object_id=popcorn.pk, action_flag__in=actions
).order_by('action_time').last()
return entry
class ClassPERAdmin(ReadOnlyAdminMixin, admin.ModelAdmin):
list_display = ('event_tag', 'popcorn_pk', 'event_time', 'event_source')
list_filter = ('event_tag', 'event_source')
actions = None
def popcorn_pk(self, obj):
return obj.popcorn.pk
class ClassTAdmin(admin.ModelAdmin):
list_display = (
'pk',
'task_description',
'task_order',
'required_for_plan_completion',
'resets_at_benefit_period_end'
)
list_display_links = ('pk', 'task_description')
list_filter = ('required_for_plan_completion', 'resets_at_benefit_period_end')
class ClassTCAdmin(ReadOnlyAdminMixin, admin.ModelAdmin):
pass
class ClassPZAdmin(admin.StackedInline):
model = ClassPZ
show_change_link = True
extra = 3
class ClassBPForm(forms.ModelForm):
model = ClassBP
def __init__(self, *args, **kwargs):
"""
Set users_points_achivement to be read-only if it is True.
Admins are allowed to set a benefit period to use points-based achievement
but, once set, it cannot be unset for current or past benefit period in the admin
due to the potential impact on the clients' users.
"""
super().__init__(*args, **kwargs)
if self.instance and self.instance.uses_points_achievement and \
self.instance.start_date and self.instance.start_date <= date.today():
field = self.fields['uses_points_achievement']
field.widget.attrs['disabled'] = True
field.help_text = f"{field.help_text} (cannot unset for current or previous benefit periods)"
def clean(self):
super().clean()
# When submitting a form with a disabled, yet checked checkbox,
# the value in cleaned_data is False and reverts the expected setting
if self.instance.uses_points_achievement and \
'disabled' in self.fields['uses_points_achievement'].widget.attrs:
self.cleaned_data['uses_points_achievement'] = True
return self.cleaned_data
class ClassDBAdmin(admin.StackedInline):
model = ClassBP
form = VendorBenefitPeriodInlineForm
show_change_link = True
extra = 0
class ClassPSAdminForm(forms.ModelForm):
def clean(self):
super().clean()
if self.cleaned_data.get(
'vendor_type'
) == 'trial' and not self.cleaned_data.get('custom_signup_url_path'):
raise ValidationError({
"custom_signup_url_path": "Custom signup url path cannot be empty for trial vendor."
})
if self.cleaned_data.get('has_custom_url') and self.cleaned_data.get(
'enabled'
) and not self.cleaned_data.get('custom_signup_url_path'):
raise ValidationError({
"custom_signup_url_path": "Custom signup url path cannot be empty for 'Enabled' vendor with 'Has Custom URL' checked."
})
return self.cleaned_data
class IsVendorDefaultFilter(admin.SimpleListFilter):
"""Filter to show only Vendors where is_vendor is True by default."""
title = "is vendor"
parameter_name = 'is_vendor'
def lookups(self, request, module_admin):
"""Show popcorns associated with any (or no) vendor by default."""
return (
(None, 'Yes'),
('0', 'No'),
('all', 'All'),
)
def choices(self, changelist):
for lookup, title in self.lookup_choices:
yield {
'selected': self.value() == lookup,
'query_string': changelist.get_query_string({
self.parameter_name: lookup,
}, []),
'display': title,
}
def queryset(self, request, queryset):
"""Show vendors if they have is_vendor set to True or False."""
value = self.value()
if value is None:
value = '1'
if value in ['1', '0']:
return queryset.filter(is_vendor=value)
else:
return queryset
class ClassPCAdmin(admin.ModelAdmin):
LAST_WEEK_DAYS = 7
LAST_MONTH_DAYS = 30
LAST_QUARTER_DAYS = 91
BREAKDOWN_DATE_FORMAT = '%m/%d/%Y'
BREAKDOWN_DEFAULT_DAYS = 7
CALCULATIONS_HELPTEXT = 'Test accounts and disabled users are excluded from these calculations.'
change_form_template = 'admin/change_vendor_form.html'
form = VendorCandidateAdminForm
inlines = (VendorZipcodeAdmin, VendorBenefitPeriodAdmin)
list_display = (
'name',
'vendor_type',
'is_vendor',
'enabled',
'product',
)
list_filter = (
IsVendorDefaultFilter,
'enabled',
'vendor_type',
'product',
'partner_name',
'hipaa_release_required',
)
search_fields = (
'name',
'partner_name',
)
readonly_fields = (
'user_signups_last_week',
'user_signups_last_month',
'user_signups_last_quarter',
'user_signups_in_benefit_period',
'user_signups_in_contract_year',
)
group_readonly_fields = OrderedDict([
(
EXADMIN_GROUP_NAME,
readonly_fields + (
'name',
'slug',
'vendor_type',
'image',
'is_vendor',
'enabled',
'member_id_required',
'benefit_year_start',
'contract_year_start',
'partner_vendor_id',
'product',
)
),
(CLIENT_SUPPORT_GROUP_NAME, readonly_fields + ('slug', )),
(USER_SUPPORT_GROUP_NAME, readonly_fields + ('slug', )),
])
fieldsets = (
(
None,
{
'fields': (
'name',
'slug',
'vendor_type',
'image',
'is_vendor',
'enabled',
'display_in_dropdown',
'has_custom_url',
'custom_signup_url_path',
'custom_url_redirect_url',
'member_id_required',
'member_id_label',
'hipaa_release_required',
'is_covered_entity',
'is_identity_authority',
'is_email_authority',
'eligibility_check_required',
'benefit_year_start',
'contract_year_start',
'legal_name',
'short_legal_name',
'address',
'parent_vendor',
'partner_vendor_id',
'partner_name',
'partner_brand_name',
'product',
'minimum_number_of_chats',
'minimum_days_between_first_and_last_chat',
)
}
),
(
"Incentive",
{
'fields': (
'has_incentive',
'incentive_type',
'incentive_amount',
'incentive_frequency',
),
'description': "Incentive information governs how a user's incentive is presented in automatic emails."
}
),
(
"ClassPU signups info",
{
'fields': (
'user_signups_last_week',
'user_signups_last_month',
'user_signups_last_quarter',
'user_signups_in_benefit_period',
'user_signups_in_contract_year',
),
'description': CALCULATIONS_HELPTEXT
}
),
(
"Client reporting details", {
'fields': (
'exclude_from_client_benchmarks',
'client_benchmark_type',
),
}
),
("Pricing details", {
'fields': (
'pricing_structure',
'per_participant_minimum',
),
}),
(
"Dashboard custom fields",
{
'fields': (
'company',
'business_unit',
'demographic_group',
'medical_coverage',
'status',
'custom_1',
'custom_2',
'custom_3',
'custom_4',
'custom_5',
),
}
),
)
def get_readonly_fields(self, request, obj=None):
"""
Slug should be editable when adding a new vendor.
"""
readonly = super().get_readonly_fields(request, obj)
if not obj and CLIENT_SUPPORT_GROUP_NAME in request.user.groups.values_list(
'name', flat=True
):
readonly = tuple(itm for itm in readonly if itm != 'slug')
return readonly
@staticmethod
def _get_user_count_between_dates(obj, start, end):
users = obj.payees.filter(signup_date__range=(start, end))
users = users.exclude(get_test_account_query()).exclude(disabled=True)
return users.count()
def user_signups_last_week(self, obj):
end = timezone.now()
start = end - timedelta(days=self.LAST_WEEK_DAYS)
return self._get_user_count_between_dates(obj, start, end)
def user_signups_last_month(self, obj):
end = timezone.now()
start = end - timedelta(days=self.LAST_MONTH_DAYS)
return self._get_user_count_between_dates(obj, start, end)
def user_signups_last_quarter(self, obj):
end = timezone.now()
start = end - timedelta(days=self.LAST_QUARTER_DAYS)
return self._get_user_count_between_dates(obj, start, end)
def user_signups_in_benefit_period(self, obj):
if not obj.current_benefit_period:
return "No benefit period cut-off for this vendor"
start_benefit = obj.current_benefit_period_start_date
end_benefit = obj.current_benefit_period_end_date
return self._get_user_count_between_dates(obj, start_benefit, end_benefit)
def user_signups_in_contract_year(self, obj):
now = timezone.now()
contract_year = obj.contract_year_start
if not contract_year:
return "No contract year cut-off for this vendor"
# Completed in current contract year calculation
start_contract = contract_year.replace(year=now.year - 1)
end_contract = contract_year.replace(year=now.year)
return self._get_user_count_between_dates(obj, start_contract, end_contract)
@staticmethod
def _dates_in_last(days):
now = timezone.now()
for i in range(days):
yield (now - timedelta(days=i)).date()
def save_model(self, request, obj, form, change):
"""Update existing Service users if sponsorship-related fields changed on the vendor."""
super().save_model(request, obj, form, change)
fields = {
# Form field: Service custom attribute
'name': ProfileAttributes.COMPANY_NAME,
'is_vendor': ProfileAttributes.SPONSORED_STATUS,
'enabled': ProfileAttributes.SPONSORED_STATUS,
'product': ProfileAttributes.NRT
}
# There is an API call needed for each popcorn and each attribute in Service, so build
# the set of attributes to update with only what changed.
attributes_to_update = set(
attribute for f, attribute in fields.items() if f in form.changed_data
)
if change and any(attributes_to_update):
popcorns = ClassP.objects.exclude(service_id='').exclude(service_id__isnull=True
).filter(chosen_vendor=obj)
for popcorn in popcorns:
maybe_sync_object_to_service(popcorn, attributes_to_update)
user_signups_last_week.short_description = \
f"ClassPU signups in the last week ({LAST_WEEK_DAYS} days)"
user_signups_last_month.short_description = \
f"ClassPU signups in the last month ({LAST_MONTH_DAYS} days)"
user_signups_last_quarter.short_description = \
f"ClassPU signups in the last quarter ({LAST_QUARTER_DAYS} days)"
def has_add_permission(self, request):
return (
super(VendorCandidateAdmin, self).has_add_permission(request)
and not getattr(request, '_updating_object', False)
)
def has_change_permission(self, request, obj=None):
return (
super(VendorCandidateAdmin, self).has_change_permission(request, obj)
and not getattr(request, '_updating_object', False)
)
def has_delete_permission(self, request, obj=None):
return (
super(VendorCandidateAdmin, self).has_delete_permission(request, obj)
and not getattr(request, '_updating_object', False)
)
class ClassPPAdmin(admin.ModelAdmin):
model = ClassPP
fields = ('name', 'description', 'slug', 'includes_nrt', 'nrt_amount')
def get_readonly_fields(self, request, obj=None):
if getattr(obj, 'slug', None) == 'default':
# Don't allow users to edit the default instance's slug
return ['slug']
return super().get_readonly_fields(request, obj)
def has_delete_permission(self, request, obj=None):
# Don't allow deletion of the default instance
if getattr(obj, 'slug', None) == 'default':
return False
return super().has_delete_permission(request, obj)
def save_model(self, request, obj, form, change):
super().save_model(request, obj, form, change)
if change and 'includes_nrt' in form.changed_data:
popcorns = ClassP.objects.exclude(service_id='').exclude(
service_id__isnull=True
).filter(chosen_vendor__in=obj.vendorcandidate_set.all(), )
attributes = [ProfileAttributes.NRT]
for popcorn in popcorns:
maybe_sync_object_to_service(popcorn, attributes)
class ClassGPGCAdmin(admin.ModelAdmin):
model = GPGConfiguration
list_display = ('slug', )
class ClassPRSEForm(forms.ModelForm):
class Meta:
widgets = {
'username': forms.TextInput(attrs={'size': '40'}),
'password': forms.PasswordInput(
render_value=True,
# Tell lastpass not to autofill this because it may cause sneaky, unseen changes
# that would make the endpoint fail to connect.
attrs={
'data-lpignore': 'true', 'size': '40'
}
),
'filename': forms.TextInput(attrs={'size': '80'}),
'summary_filename': forms.TextInput(attrs={'size': '80'}),
}
class ClassPRSEAdmin(admin.ModelAdmin):
model = VendorReportSFTPEndpoint
change_form_template = "admin/change_vendorreportsftpendpoint_form.html"
fields = (
'slug',
'vendor',
'host',
'port',
'username',
'password',
'host_key_contents',
'verify_host_key',
'filename',
'filename_separator',
'summary_filename',
'remote_path',
'gpg_configuration',
)
list_display = ('slug', 'vendor', 'host', 'port')
def get_form(self, request, obj=None, **kwargs):
kwargs['form'] = VendorReportSFTPEndpointForm
return super().get_form(request, obj, **kwargs)
def response_change(self, request, obj):
if "_test-connection" in request.POST:
try:
obj.test_connection()
except ConnectionError as err:
messages.error(request, err)
else:
messages.success(request, "Connection success!")
return HttpResponseRedirect(".")
return super().response_change(request, obj)
class VendorApiKeyAdmin(admin.ModelAdmin):
list_display = (
'slug',
'enabled',
'expires',
'date_created',
)
list_filter = ('enabled', )
def log_addition(self, request, object, message):
try:
message = f"{message} [added from {request.META['REMOTE_ADDR']}]"
except KeyError:
message = f"{message} [added from undetermined IP address]"
self._applog_vendor_api_key_action(object, request, "added")
super().log_addition(request, object, message)
def log_change(self, request, object, message):
try:
message = f"{message} [changed from {request.META['REMOTE_ADDR']}]"
except KeyError:
message = f"{message} [changed from undetermined IP address]"
self._applog_vendor_api_key_action(object, request, "changed")
super().log_change(request, object, message)
def log_deletion(self, request, object, object_repr):
self._applog_vendor_api_key_action(object, request, "deleted")
super().log_deletion(request, object, object_repr)
def _applog_vendor_api_key_action(self, vendor_key, request, action_name):
"""For compliance reasons, we want to know who has accessed vendor_key data in app logs"""
user = request.user
ip_addr = request.META.get('REMOTE_ADDR', "undetermined IP address")
user_groups = ', '.join(user.groups.values_list('name', flat=True)) or "no groups"
if settings.ALWAYS_LOG_ADMIN_ACTIVITY:
logger.info(
f"VENDOR KEY ACCESS: {user.username} in groups {user_groups} {action_name} "
f"vendor_key {vendor_key.pk} from IP {ip_addr}"
)
class ClassHPRPAdmin(ReadOnlyAdminMixin, admin.ModelAdmin):
list_display = (
'hipaa_release_revision',
'vendor',
'vendor_hipaa_text_fingerprint',
'language_code',
)
ordering = (
'-hipaa_release_revision',
'vendor__name',
)
class ClassEPNAdmin(admin.ModelAdmin):
list_display = (
'id',
'is_active',
'dismissible',
'shortened_body',
)
list_display_links = list_display
fields = (
'is_active',
'dismissible',
'body',
'kind',
'created',
'updated',
)
readonly_fields = (
'created',
'updated',
'kind',
)
def shortened_body(self, obj):
return BeautifulSoup(obj.body).text[:100]
shortened_body.short_description = 'Body'
class ClassRRAdmin(admin.ModelAdmin):
list_display = (
'reason',
'slug',
'active',
)
def get_readonly_fields(self, request, obj=None):
"""Make the slug readonly if the object already exists.
But, let someone set it in the admin at creation time. This is helpful
because these slugs are sent to BEXSS and used to refer to reasons by
something other than PK."""
if obj:
return ["slug"]
return super().get_readonly_fields(request, obj=obj)
class TokenAdmin(admin.ModelAdmin):
list_display = (
'get_key',
'user',
'created',
)
fields = (
'user',
'key',
)
ordering = ('-created', )
raw_id_fields = ('user', )
def get_key(self, obj):
return obj.key[:8] + '***'
def changelist_view(self, request, extra_context=None):
if request.method == 'GET':
user = request.user
user_groups = ', '.join(user.groups.values_list('name', flat=True)) or "no groups"
ip_addr = request.META.get('REMOTE_ADDR', "undetermined IP address")
logger.info(
f"TOKEN ACCESS: {user.username} in groups {user_groups} viewed "
f"token list from IP {ip_addr}"
)
return super().changelist_view(request, extra_context)
def changeform_view(self, request, object_id=None, form_url='', extra_context=None):
if object_id is not None and request.method == 'GET':
obj = Token.objects.get(key=object_id)
self._applog_token_action(obj, request, "viewed")
return super().changeform_view(request, object_id, form_url, extra_context)
def log_addition(self, request, object, message):
try:
message = f"{message} [added from {request.META['REMOTE_ADDR']}]"
except KeyError:
message = f"{message} [added from undetermined IP address]"
self._applog_token_action(object, request, "added")
super().log_addition(request, object, message)
def log_change(self, request, object, message):
try:
message = f"{message} [changed from {request.META['REMOTE_ADDR']}]"
except KeyError:
message = f"{message} [changed from undetermined IP address]"
self._applog_token_action(object, request, "changed")
super().log_change(request, object, message)
def _applog_token_action(self, token, request, action_name):
"""For compliance reasons, we want to know who has accessed token data in app logs"""
user = request.user
ip_addr = request.META.get('REMOTE_ADDR', "undetermined IP address")
user_groups = ', '.join(user.groups.values_list('name', flat=True)) or "no groups"
if settings.ALWAYS_LOG_ADMIN_ACTIVITY:
logger.info(
f"TOKEN ACCESS: {user.username} in groups {user_groups} {action_name} "
f"token {token.key[:8]} from IP {ip_addr}"
)
class PopcornActivationForm(forms.ModelForm):
account_annotation = forms.CharField(
label="Why are you activating this account?",
help_text="This annotation will be saved in the account history.",
widget=forms.Textarea,
)
class Meta:
model = ClassP
fields = ['account_annotation']
def change_message(self):
annotation = self.cleaned_data['account_annotation']
return f"activated: {annotation}"
def save(self, commit=True):
popcorn = super().save(commit=False)
popcorn.account_status = ClassP.ACTIVE
popcorn.disabled = False
if commit:
popcorn.save()
return popcorn
class PopcornDeactivationForm(forms.ModelForm):
CHOICES = (
(ClassP.INACTIVE, 'Deactivate'),
(ClassP.BANNED, 'Ban'),
)
account_status = forms.ChoiceField(
choices=CHOICES,
label='Would you like to deactivate or ban the account?',
)
account_annotation = forms.CharField(
label="Why are you deactivating this account?",
help_text="This annotation will be saved in the account history.",
required=True,
widget=forms.Textarea,
)
class Meta:
model = ClassP
fields = ['account_status', 'account_annotation']
def _banned(self):
return self.cleaned_data['account_status'] == ClassP.BANNED
def action_flag(self):
return PopcornAdmin.BANNED if self._banned() else PopcornAdmin.DEACTIVATED
def action_string(self):
return 'banned' if self._banned() else 'deactivated'
def change_message(self):
action = self.action_string()
annotation = self.cleaned_data['account_annotation']
return f"{action}: {annotation}"
def save(self, commit=True):
popcorn = super().save(commit=False)
popcorn.disabled = True
popcorn.email_optin = False
popcorn.email_optin_announcements = False
popcorn.email_optin_updates = False
popcorn.email_optin_newsletter = False
popcorn.email_optin_reminders = False
popcorn.email_optin_community = False
if commit:
popcorn.save()
ClasPER.record_email_opt_out(popcorn, 'Admin')
if popcorn.email_provider_id:
unsubscribe_object_from_email_provider_lists_task.delay(popcorn.popcorn_id)
return popcorn
class ClassPUAdmin(OrigUserAdmin):
list_display = ('username', 'is_staff')
def log_change(self, request, object, message):
user = request.user
ip_addr = request.META.get('REMOTE_ADDR', "undetermined IP address")
user_groups = ', '.join(user.groups.values_list('name', flat=True)) or "no groups"
logger.info(
f"DJANGOUSER ACCESS: {user.username} in groups {user_groups} changed "
f"django user {object.username} (pk {object.pk}) from IP {ip_addr}"
)
super().log_change(request, object, message)
class ClassAAAdmin(admin.ModelAdmin):
class Meta:
model = ClassAA
class ClassAAOAdmin(admin.ModelAdmin):
class Meta:
model = ClassAAO
class ClassAMAdmin(admin.ModelAdmin):
ordering = ('days_abstinent', 'relapse_milestone')
class Meta:
model = ClassAM
class ClassGSAdmin(admin.ModelAdmin):
list_display = (
'name',
'slug',
'language_code',
'external_service_id',
'is_phi_safe',
'phi_safe_equivalent'
)
model = ClassGS
@admin.register(ClassV)
class ClassVAdmin(admin.ModelAdmin):
list_display = ('popcorn', 'created', 'modified', 'your_vision')
list_filter = ('created', 'modified')
readonly_fields = ('popcorn', )
@admin.register(ClassACT)
class ActivityAdmin(admin.ModelAdmin):
readonly_fields = ('slug', )
fields = ('slug', 'points')
# Allow editing points directly from the list view
list_display = ('slug', 'points')
list_editable = ('points', )
# Don't allow deleting activities
actions = None
def has_add_permission(self, request, obj=None):
return False
admin.site.register(ClasPER, ClassPERAdmin)
admin.site.register(ClassT, ClassTAdmin)
admin.site.register(ClassTC, ClassTCAdmin)
admin.site.register(ClassP, ClassPAdmin)
admin.site.register(ClassPU, ClassPUAdmin)
admin.site.register(ClassPC, ClassPCAdmin)
admin.site.register(ClassPP, ClassPPAdmin)
admin.site.register(ClassPAK, ClassPAKAdmin)
admin.site.register(ClassPHRP, ClassPHRPAdmin)
admin.site.register(ClassPRSE, ClassPRSEAdmin)
admin.site.register(ClassGPGC, ClassGPGCAdmin)
admin.site.register(ClassEPN, ClassEPNAdmin)
admin.site.register(ClassRR, ClassRRAdmin)
admin.site.register(Token, TokenAdmin)
admin.site.register(ClassAA, ClassAAAdmin)
admin.site.register(ClassAAO, ClassAAOAdmin)
admin.site.register(ClassAM, ClassAMAdmin)
admin.site.register(ClassGS, ClassGSAdmin)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment