Skip to content

Instantly share code, notes, and snippets.



Created Apr 20, 2017
What would you like to do?
"""Provide information about opportunities."""
from collections import OrderedDict
import datetime as dt
import json
from flask import request
from flask_restler import route
from flask_restler.peewee import Filter, ModelFilters
from peewee import PostgresqlDatabase, SQL, Clause
from people_ai.api.v1.teams import TeamModelResource, api, filters as f, get_current_data_version, SQLTickMixin
from import SalesForceOpportunity, SalesForceCompany, OpportunityCustomField
from import OpportunityTick
from import TeamMembership
from people_ai.api.v1.teams.activity import ActivityLogMixin
from people_ai.api.v1.teams.opportunity import filters as f2, schemas
from people_ai.ext import db
from people_ai.utils.csv_utils import csv_response
class UnionModelFilters(ModelFilters):
"""Magic for empty opportunities."""
def filter(self, collection, resource, *args, **kwargs):
"""Filter two collections."""
if 'hide_empty_opps' in request.args:
return None, super(UnionModelFilters, self).filter(collection, resource, *args, **kwargs)
request.filters = {}
data = request.args.get('where')
if not data or self.filters is None:
return collection
data = json.loads(data)
except (ValueError, TypeError):
return collection
filters = [fl for fl in self.filters if fl.fname in data]
base_filters = [fl for fl in filters if not in resource.meta.ticks_filters]
# Clean opportunities query
base_collection =
base_collection._where = None
base_collection._joins = {}
for fl in base_filters:
base_collection = fl.filter(base_collection, data, resource=resource, **kwargs)
for fl in filters:
collection = fl.filter(collection, data, resource=resource, **kwargs)
return (base_collection.filter(
)).order_by(, collection)
class OpportunityResource(TeamModelResource, ActivityLogMixin, SQLTickMixin):
"""Load opportunities."""
class Meta:
"""Tune the resource."""
model = SalesForceOpportunity
name = 'opportunity'
filters_converter = UnionModelFilters
filters = (
# Opportunity Model filters
'amount', 'probability', 'is_closed', 'is_won', 'time_spent', f.QueryFilter('name'),
f.DateTimeFilter('created_at'), f.DateTimeFilter('closed_at'), f.DateTimeFilter('last_touch'),
Filter('stage_name', 'stage'), Filter('company', 'account_id'), f.QueryFilter('type', 'opportunity_type'),
# Company Model filters
# Opportunity Tick Model filters
f2.UTCFilter('utc'), f2.MembershipFilter('membership'), f2.ActivityTypeFilter('type'),
# Revenue filters
f2.OpportunityRevenueMapping('MR'), f2.OpportunityRevenueMapping('AR'), f2.OpportunityRevenueMapping('NR'),
ticks_filters = 'utc', 'membership', 'type'
def filter(self, *args, **kwargs):
"""Save a base collection."""
self.base_collection, collection = super(OpportunityResource, self).filter(*args, **kwargs) # noqa
return collection
def paginate(self, offset=0, limit=None):
"""Pagination magic for two collections."""
collection, count = super(OpportunityResource, self).paginate(offset, limit)
if 'hide_empty_opps' in request.args:
return collection, count
page_size = collection.count()
offset = max(0, offset - count)
count += self.base_collection.count()
self.base_collection = self.base_collection.offset(offset + page_size).limit(limit - page_size) # noqa
return collection, count
def get(self, resource=None, **kwargs):
"""Merge two collections."""
if resource or 'hide_empty_opps' in request.args:
return super(OpportunityResource, self).get(resource, **kwargs)
return self.to_simple(list(self.collection) + list(self.base_collection), many=True, **kwargs)
def sort(self, collection, *sorting, **kwargs):
"""Put null values on last places."""
collection = super(OpportunityResource, self).sort(collection, *sorting, **kwargs)
if isinstance(db.database.obj, PostgresqlDatabase):
collection._order_by = [Clause(collection._order_by[0], SQL('NULLS LAST'))]
return collection
def get_many(self, team=None, opportunity=None, **kwargs):
"""Load opportunities for current team."""
members = self.auth.teammates().select(
qs =, SalesForceCompany).\
where(SalesForceOpportunity.team_id == team).\
join(SalesForceCompany, 'LEFT OUTER').switch(SalesForceOpportunity)
if 'aggregate' not in request.args:
return qs
versions = get_current_data_version(members)
versions = versions['$in']
SalesForceOpportunity, SalesForceCompany, *schemas.OpportunitySchema.aggregate(OpportunityTick)).\
join(OpportunityTick, 'LEFT OUTER').\
where(OpportunityTick.version << versions).\
where(OpportunityTick.utc < dt.datetime.utcnow()).\
def get_schema(self, resource=None, opportunity=None, **kwargs):
"""Provide aggregated information."""
if opportunity:
return schemas.ResourceAggregateOpportunitySchema(instance=resource)
return schemas.OpportunitySchema(instance=resource)
def custom(self, *args, **kwargs):
"""Return loaded custom fields."""
collection = \
where(SalesForceOpportunity.team_id == self.auth.team_id).distinct()
return self.to_json_response([ for field in collection])
def csv(self, *args, **kwargs):
"""Render CSV for opportunities."""
collection = self.filter(self.collection)
file_name = request.args.get('name', 'export.csv')
data = self.to_simple(collection, many=True, **kwargs)
if not data:
return csv_response([], file_name=file_name)
csv_data = self.get_csv_data(
csv_map=OrderedDict([('name', 'Name'), ('id', 'ID'), ('last_touch', 'Last Touch')]),
csv_footer=[['Total Opportunities: %s' % len(data)], ['Opportunities']]
return csv_response(csv_data, file_name)
def activity_csv(self, *args, **kwargs):
"""Render CSV for activities."""
collection = self.filter(self.collection)
opportunities =
where = request.filters
where.update({'opportunity_id': {'$in': [ for x in opportunities]}})
query_string = {
'per_page': 0,
'where': where,
file_name = request.args.get('name', 'export.csv')
csv_data = self.get_related_activities_csv(
name='Opportunity Activities'
return csv_response(csv_data, file_name=file_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.