Skip to content

Instantly share code, notes, and snippets.

@lensraster
Created June 12, 2018 08:30
Show Gist options
  • Save lensraster/ec833f52949e07564e983e689f1685c6 to your computer and use it in GitHub Desktop.
Save lensraster/ec833f52949e07564e983e689f1685c6 to your computer and use it in GitHub Desktop.
Parting's businesses/models.py
import os
import uuid
import urllib
import json
from urlparse import urlparse
from django.urls import reverse
from django.db import models
from django.db.models import Avg, Q
from django.db.models.functions import Length
from django.contrib.gis.measure import D
from django.conf import settings
from django.contrib.gis.db.models import PointField
from django.contrib.gis.geos import fromstr
from django.utils.text import slugify
from django.utils.functional import cached_property
from django_extensions.db.models import TimeStampedModel, TitleSlugDescriptionModel, TitleDescriptionModel
from apps.utils.states import states
from apps.utils.urlsigner import sign_url
def get_icon_upload_path(instance, filename):
file_name, file_extension = os.path.splitext(filename)
return 'business/' + str(instance.id) + '/business_icons/' + str(uuid.uuid4()) + file_extension
def get_hero_upload_path(instance, filename):
file_name, file_extension = os.path.splitext(filename)
return 'business/' + str(instance.id) + '/business_heros/' + str(uuid.uuid4()) + file_extension
class Business(TimeStampedModel, TitleSlugDescriptionModel):
BUSINESS_TYPE_CHOICES = (
(1, 'funeral home'),
)
GPL_RECEIVED = 'gpl'
ON_PHONE = 'phone'
GPL_SOURCE_CHOICES = (
(GPL_RECEIVED, 'GPL Received'),
(ON_PHONE, 'Confirmed on Phone')
)
business_type = models.PositiveSmallIntegerField(choices=BUSINESS_TYPE_CHOICES, default=1)
phone = models.CharField('Phone Number', max_length=32, blank=True, null=True)
icon = models.ImageField('Icon', upload_to=get_icon_upload_path, blank=True, null=True)
hero = models.ImageField('Hero', upload_to=get_hero_upload_path, blank=True, null=True)
tracking_number = models.CharField('Tracking Number', max_length=255, blank=True, null=True)
url = models.CharField(max_length=1024, blank=True, null=True)
seo_url = models.CharField(max_length=255)
full_slug = models.CharField(max_length=255, default='')
verified = models.BooleanField(default=False)
premium = models.BooleanField(default=False)
parting_pro_customer = models.BooleanField(default=False)
parting_pro_shop_url = models.CharField(max_length=255, default='', blank=True, null=True)
claimed = models.IntegerField(default=0)
is_active = models.BooleanField(default=True)
is_body_donation = models.BooleanField(default=False)
address_display_override = models.CharField(max_length=100, null=True, blank=True)
gpl_date = models.DateField(null=True, blank=True)
gpl_source = models.CharField(choices=GPL_SOURCE_CHOICES, null=True, blank=True, max_length=20)
@staticmethod
def autocomplete_search_fields():
return ("title__icontains",)
def serialize_packages(self, queryset, services, dedupe_filter_params=None):
packages = []
for package in queryset:
package_items = package.items.select_related('service').order_by('id')
if dedupe_filter_params:
dedupe_filter_params.append({'has_burial': package.has_burial, 'has_service': package.has_service,
'has_viewing': package.has_viewing})
package_data = {
'title': package.title,
'has_burial': package.has_burial,
'has_service': package.has_service,
'has_viewing': package.has_viewing,
'discount': getattr(package, 'package_discount', 0),
'has_non_extended_services': False, # May be overwritten below
'has_extended_services': False, # May be overwritten below,
'is_body_donation': self.is_body_donation
}
services_total = 0
package_data['services'] = []
for i in package_items:
item_data = {
'id': i.service_id,
'title': i.get_display_name,
'included': getattr(i, 'included', False),
'extended': False, # May be overwritten below
}
if i.service:
price = services[i.service.slug]['price'] if services.get(i.service.slug) else 0
# Items with no price are invalid
if not price and not self.is_body_donation:
continue
item_data['price'] = price + i.price_modifier
services_total += item_data['price']
if i.service.is_extended_service:
item_data['extended'] = True
package_data['has_extended_services'] = True
else:
package_data['has_non_extended_services'] = True
else:
package_data['has_non_extended_services'] = True
package_data['services'].append(item_data)
package_data['total'] = services_total - package_data['discount']
packages.append(package_data)
return packages
def get_packages(self):
services = self.get_service_pricing(only_itemizable=True)
packages = []
dedupe_filter_params = []
packages += self.serialize_packages(self.custom_packages.all(), services, dedupe_filter_params=dedupe_filter_params)
business_service_packages = self.businessservicepackage_set.all()
for set in dedupe_filter_params:
business_service_packages = business_service_packages.exclude(**set)
packages += self.serialize_packages(business_service_packages, services)
return packages
def get_package(self, **combo):
queryset = self.custom_packages.filter(**combo)
if not len(queryset):
queryset = self.businessservicepackage_set.filter(**combo)
packages_data = self.serialize_packages(queryset, self.get_service_pricing(only_itemizable=True))
return packages_data[0] if len(packages_data) else {}
def get_service_pricing(self, only_itemizable=False):
queryset = self.businessservice_set.all()
service_names = {}
for service in queryset:
s = Service.get_item(service.service_id)
if not (only_itemizable and not s.is_itemizable):
service_names.update({s.slug: {'price': service.price, 'name': s.title}})
return service_names
def get_review_set(self):
reviews = []
for _r in self.review_set.all():
if len(_r.text) >= 50:
reviews.append(_r)
return reviews
@property
def url_clean(self):
if self.url:
url = urlparse(self.url)
return url.netloc
return None
def get_phone(self):
return self.tracking_number or self.phone
@property
def description_length(self):
return len(self.description)
@property
def image_count(self):
return self.image_set.count()
@property
def total_reviews(self):
return self.review_set.count()
@property
def avg_review_queried(self):
avg = self.review_set.aggregate(Avg('rating'))['rating__avg']
return avg or 0
@property
def gmap_directions(self):
return 'https://maps.google.com?daddr=' + self.full_address
@property
def slugify_name(self):
return slugify(self.seo_url + '-' + self.address.zip_postal_code)
def get_street_view_image(self, size="360x240"):
map_url = 'https://maps.googleapis.com/maps/api/streetview?size={}&location={}&key={}' \
.format(size, urllib.quote_plus(self.full_address.encode('utf-8'), safe=':/'.encode('utf-8')),
settings.GOOGLE_API_KEY)
return sign_url(map_url, settings.GOOGLE_URL_SIGN_SECRET)
@property
def street_view_icon(self):
return self.get_street_view_image()
@property
def street_view_icon_mobile(self):
return self.get_street_view_image(size="144x96")
@property
def large_streetview_image(self):
return self.get_street_view_image(size="640x426")
@property
def get_google_static_image(self):
return 'https://maps.google.com/maps/api/staticmap?zoom=13&size=300x190&markers=icon:http://parting-web-cms.herokuapp.com/static/img/parting-marker.png|' + urllib.quote_plus(self.full_address.encode('utf-8'), safe = ':/'.encode('utf-8'))
@property
def get_primary_image(self):
if self.image_count > 0:
first = self.image_set.first()
if first:
return 'https://parting.s3.amazonaws.com/' + str(first.url)
else:
return self.large_streetview_image
@property
def full_address(self):
return " ".join(filter(None, [self.address.address, self.address.city, self.address.state_province_region]))
def get_absolute_url(self):
if hasattr(self,'address'):
return reverse('businesses-business', kwargs={'city': self.address.slugify_city, 'state': self.address.slugify_state, 'business': self.slugify_name})
else:
return None
@property
def avg_cost(self):
return self.pricelookup.avg_cost
@property
def avg_cremation_memorial(self):
return self.pricelookup.memorial
@property
def calc_avg_cost(self):
def try_get(dict, name):
return dict[name] if name in dict else {'price': 0}
prices = self.get_service_pricing()
bs = try_get(prices, 'basic_services')
transfer = try_get(prices, 'transfer_remains')
viewing = try_get(prices, 'facilities_and_staff_viewing')
ceremony = try_get(prices, 'facilities_and_staff_funeral_ceremony')
embalming = try_get(prices, 'embalming')
if bs['price'] == 0 or transfer['price'] == 0 or viewing['price'] == 0 or ceremony['price'] == 0:
return 0
total = bs['price'] + transfer['price'] + viewing['price'] + ceremony['price'] + embalming['price']
if total > 0:
total += sum([p for p in settings.DEFAULT_PRICES.values()])
return total
@property
def calc_avg_cost_no_service(self):
def try_get(dict, name):
return dict[name] if name in dict else {'price': 0}
prices = self.get_service_pricing()
bs = try_get(prices, 'basic_services')
transfer = try_get(prices, 'transfer_remains')
viewing = try_get(prices, 'facilities_and_staff_viewing')
#ceremony = try_get(prices, 'facilities_and_staff_funeral_ceremony')
embalming = try_get(prices, 'embalming')
if bs['price'] == 0 or transfer['price'] == 0 or viewing['price'] == 0:
return 0
total = bs['price'] + transfer['price'] + viewing['price'] + embalming['price']
if total > 0:
total += sum([p for p in settings.DEFAULT_PRICES.values()])
return total
@property
def calc_traditional_no_viewing(self):
def try_get(dict, name):
return dict[name] if name in dict else {'price': 0}
prices = self.get_service_pricing()
bs = try_get(prices, 'basic_services')
transfer = try_get(prices, 'transfer_remains')
ceremony = try_get(prices, 'facilities_and_staff_funeral_ceremony')
embalming = try_get(prices, 'embalming')
if bs['price'] == 0 or transfer['price'] == 0 or ceremony['price'] == 0:
return 0
total = bs['price'] + transfer['price'] + ceremony['price'] + embalming['price']
if total > 0:
total += sum([p for p in settings.DEFAULT_PRICES.values()])
return total
@property
def calc_cremation_memorial_no_viewing(self):
def try_get(dict, name):
return dict[name] if name in dict else {'price': 0}
prices = self.get_service_pricing()
bs = try_get(prices,'basic_services')
transfer = try_get(prices,'transfer_remains')
service = try_get(prices,'facilities_and_staff_memorial_service')
if bs['price'] == 0 or transfer['price'] == 0 or service['price'] == 0:
return 0
total = bs['price'] + transfer['price'] + service['price']
if total > 0:
total += 300
return total
@property
def calc_avg_cremation_memorial_no_service(self):
def try_get(dict, name):
return dict[name] if name in dict else {'price': 0}
prices = self.get_service_pricing()
bs = try_get(prices,'basic_services')
transfer = try_get(prices,'transfer_remains')
viewing = try_get(prices,'facilities_and_staff_viewing')
if bs['price'] == 0 or transfer['price'] == 0 or viewing['price'] == 0:
return 0
total = bs['price'] + transfer['price'] + viewing['price']
if total > 0:
total += 300
return total
@property
def calc_avg_cremation_memorial(self):
def try_get(dict, name):
return dict[name] if name in dict else {'price': 0}
prices = self.get_service_pricing()
bs = try_get(prices,'basic_services')
transfer = try_get(prices,'transfer_remains')
viewing = try_get(prices,'facilities_and_staff_viewing')
ceremony = try_get(prices,'facilities_and_staff_memorial_service')
if bs['price'] == 0 or transfer['price'] == 0 or viewing['price'] == 0 or ceremony['price'] == 0:
return 0
total = bs['price'] + transfer['price'] + viewing['price'] + ceremony['price']
if total > 0:
total += 300
return total
@cached_property
def address(self):
return Address.objects.filter(business=self).order_by('-is_main').first()
@cached_property
def display_address(self):
if self.address_display_override:
return self.address_display_override
elif self.address.state_wide:
return 'Serving the entire state of {}'.format(states[self.address.state_province_region])
else:
return '<a href="{}" target="_blank">{}</a>'.format(self.gmap_directions, self.address.short_display)
def to_json(self, as_dict=False):
data = {
'id': self.id,
'location': {
'lat': float(self.address.latitude),
'lng': float(self.address.longitude)
},
'title': self.title,
'url': reverse('businesses-business', kwargs={"city": self.address.slugify_city,
"state": self.address.slugify_state,
"business": self.slugify_name}),
'icon': '{}{}'.format(settings.MEDIA_URL, self.icon) if self.icon else self.street_view_icon,
}
data['prices'] = {
'avg_cost': ('Avg. Funeral', int(self.pricelookup.avg_cost)),
'memorial': ('Cremation Memorial', int(self.pricelookup.memorial)),
'direct_burial': ('Direct Burial', int(self.pricelookup.direct_burial)),
'direct_cremtion': ('Direct Cremation', int(self.pricelookup.direct_cremation)),
'memorial_no_viewing': ('Memorial No Viewing', int(self.pricelookup.traditional_no_viewing)),
'traditional_no_viewing': ('Traditional No Viewing', int(self.pricelookup.traditional_no_viewing)),
}
if not as_dict:
data = json.dumps(data).replace("'", "\\'")
return data
class Meta:
verbose_name_plural = 'Businesses'
def __unicode__(self):
return self.title
class PriceLookup(models.Model):
business = models.OneToOneField(Business, on_delete=models.CASCADE)
traditional = models.FloatField(default=0)
memorial = models.FloatField(default=0)
direct_burial = models.FloatField(default=0)
direct_cremation = models.FloatField(default=0)
avg_cost = models.FloatField(default=0)
version = models.FloatField(default=0)
traditional_no_viewing = models.FloatField(default=0)
memorial_no_viewing = models.FloatField(default=0)
memorial_no_service = models.FloatField(default=0)
traditional_no_service = models.FloatField(default=0)
class Address(TimeStampedModel):
business = models.ForeignKey(Business, on_delete=models.CASCADE, related_name='addresses')
address = models.CharField('Street Address/P.O. box/Company Name/etc.', max_length=255, blank=True, null=True)
address2 = models.CharField('Apartment/Suite/Unit/Building/Floor/etc.', max_length=255, blank=True, null=True)
city = models.CharField('City', max_length=100, blank=True, null=True)
state_province_region = models.CharField('State/Province/Region', max_length=100, blank=True, null=True)
zip_postal_code = models.CharField('Zip/Postal Code', max_length=100, blank=True, null=True)
country = models.CharField('Country', max_length=100, blank=True, null=True)
latitude = models.DecimalField(max_digits=9, decimal_places=6, default=0)
longitude = models.DecimalField(max_digits=9, decimal_places=6, default=0)
service_range = models.IntegerField(choices=settings.SERVICE_RANGE_CHOICES, default=settings.SERVICE_RANGE_VALUES[0])
state_wide = models.BooleanField(default=False)
is_main = models.BooleanField(default=False)
point = PointField(srid=4326)
@staticmethod
def get_base_query(point_data):
distance_filter = Q(state_wide=True, state_province_region=point_data['state_abbrev'])
for range_value in settings.SERVICE_RANGE_VALUES:
distance_filter |= Q(point__distance_lte=(point_data['db_repr'], D(mi=range_value)), service_range=range_value)
return Address.objects.select_related('business').select_related('business__pricelookup') \
.prefetch_related('business__businessservice_set') \
.prefetch_related('business__businessservicepackage_set') \
.prefetch_related('business__review_set') \
.filter(distance_filter, business__is_active=True)
@staticmethod
def unique(queryset):
business_dupes = []
unique_list = []
for item in queryset:
if item.business.pk not in business_dupes:
business_dupes.append(item.business.pk)
unique_list.append(item)
return unique_list
@property
def short_display(self):
return "{0} {1}".format(", ".join(filter(None,[self.address, self.address2, self.city, self.state_province_region])), self.zip_postal_code)
@property
def slugify_city(self):
return slugify(self.city)
@property
def slugify_state(self):
return slugify(self.state_province_region).upper()
@property
def state_full(self):
return states.get(self.state_province_region, self.state_province_region)
def get_point(self):
return fromstr('POINT({0} {1})'.format(self.longitude, self.latitude), srid=4326)
def save(self, *args, **kwargs):
self.point = fromstr('POINT({0} {1})'.format(self.longitude, self.latitude), srid=4326)
if self.is_main:
self.__class__.objects.filter(business=self.business).exclude(pk=self.pk).update(is_main=False)
return super(Address, self).save(*args, **kwargs)
class Meta:
verbose_name_plural = 'Addresses'
class Service(TimeStampedModel, TitleDescriptionModel):
items = None
@staticmethod
def get_item(id):
if not Service.items:
Service.items = Service.objects.all().in_bulk()
return Service.items[id]
ordinal = models.IntegerField()
slug = models.CharField(max_length=100, blank=True, null=True)
short_name = models.CharField(max_length=100, blank=True, null=True)
is_itemizable = models.BooleanField(default=True, help_text='Will this show up as an itemizable service?')
is_extended_service = models.BooleanField(default=False, help_text='Will this show up under AVG prices?')
is_price_locked = models.BooleanField(default=False, help_text='Can the user toggle this cost off?')
is_optional = models.BooleanField(default=False, help_text='Can this services cost be toggled off or on?')
def __unicode__(self):
return self.title
class BusinessService(TimeStampedModel):
business = models.ForeignKey(Business, on_delete=models.CASCADE)
service = models.ForeignKey(Service, on_delete=models.CASCADE)
price = models.FloatField()
class Review(TimeStampedModel):
business = models.ForeignKey(Business, on_delete=models.CASCADE)
first_name = models.CharField('First Name', max_length=255)
last_name = models.CharField('Last Name', max_length=255)
text = models.CharField(max_length=2048)
@property
def display_name(self):
return self.first_name + " " + (self.last_name[:1] + "." if self.last_name else "")
@property
def display_date(self):
return self.created.strftime('%b %d, %Y')
REVIEW_RATING_CHOICES = (
(1, '1 Star'),
(2, '2 Star'),
(3, '3 Star'),
(4, '4 Star'),
(5, '5 Star'),
)
rating = models.PositiveSmallIntegerField(choices=REVIEW_RATING_CHOICES, default=3)
class BusinessServicePackageBase(TimeStampedModel, TitleDescriptionModel):
has_burial = models.BooleanField(default=False, help_text='Is a burial and not a cremation')
has_service = models.BooleanField(default=False, help_text='Service or No Service')
has_viewing = models.BooleanField(default=False, help_text='Wake/Viewing or not viewing')
class Meta:
abstract = True
class BusinessServicePackage(BusinessServicePackageBase):
business = models.ManyToManyField(Business)
def __unicode__(self):
return self.title
class CustomBusinessServicePackage(BusinessServicePackageBase):
business = models.ForeignKey(Business, on_delete=models.CASCADE, related_name='custom_packages')
package_discount = models.IntegerField(default=0)
def __unicode__(self):
return "{} ({})".format(self.title, self.business)
class BusinessServicePackageItemBase(TimeStampedModel):
service = models.ForeignKey(Service, on_delete=models.CASCADE, null=True, blank=True)
name_override = models.CharField(max_length=100, blank=True, null=True)
price_modifier = models.FloatField(default=0)
is_locked = models.BooleanField(default=False)
@property
def get_display_name(self):
if self.name_override:
return self.name_override
else:
return self.service.short_name
class Meta:
abstract = True
class BusinessServicePackageItem(BusinessServicePackageItemBase):
service = models.ForeignKey(Service, on_delete=models.CASCADE)
business_service_package = models.ForeignKey(BusinessServicePackage,
on_delete=models.CASCADE, related_name='items')
class CustomBusinessServicePackageItem(BusinessServicePackageItemBase):
business_service_package = models.ForeignKey(CustomBusinessServicePackage,
on_delete=models.CASCADE, related_name='items')
included = models.BooleanField(default=False)
def get_upload_path(instance, filename):
file_name, file_extension = os.path.splitext(filename)
return 'business/' + str(instance.business_id) + '/business_images/' + str(uuid.uuid4()) + file_extension
class Image(TimeStampedModel, TitleDescriptionModel):
business = models.ForeignKey(Business, on_delete=models.CASCADE)
ordinal = models.IntegerField()
url = models.ImageField('Url', upload_to=get_upload_path, blank=True, null=True, max_length=255)
@property
def full_url(self):
return 'https://parting.s3.amazonaws.com/' + str(self.url)
class Meta:
ordering = ['ordinal',]
class Event(TimeStampedModel):
business = models.ForeignKey(Business, on_delete=models.CASCADE)
type = models.CharField('Type', max_length=255)
source = models.CharField('Source', max_length=255)
media_type = models.CharField('Media', max_length=255)
medium = models.CharField('Medium', max_length=255)
class Lead(TimeStampedModel):
name = models.CharField(max_length=255)
name_of_deceased = models.CharField(max_length=255, default=None, blank=True, null=True)
email = models.CharField(max_length=255)
location = models.CharField(max_length=255, default=None, blank=True, null=True)
phone = models.CharField(max_length=255, default=None, blank=True, null=True)
message = models.CharField(max_length=255)
business = models.ForeignKey(Business, on_delete=models.CASCADE)
estimated_cost = models.FloatField()
burial = models.BooleanField(default=False)
service = models.BooleanField(default=False)
viewing = models.BooleanField(default=False)
immediate = models.BooleanField(default=False)
"""
class ServiceGuide(TimeStampedModel):
pass
class ServiceGuideQuestion(TimeStampedModel):
text = models.CharField(max_length=255)
has_optional = models.BooleanField(default=False)
has_tell_me_more = models.BooleanField(default=False)
is_multiple_choice = models.BooleanField(default=False)
is_dropdown = models.BooleanField(default=False)
service_guide = models.ForeignKey(ServiceGuide)
class ServiceGuideAnswer(TimeStampedModel):
text = models.CharField(max_length=255)
question = models.ForeignKey(ServiceGuideQuestion)
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment