Skip to content

Instantly share code, notes, and snippets.

@djarum-black
Created March 28, 2023 03:02
Show Gist options
  • Save djarum-black/19ca178314a33450099671eef189a243 to your computer and use it in GitHub Desktop.
Save djarum-black/19ca178314a33450099671eef189a243 to your computer and use it in GitHub Desktop.
xxx
import arrow
import numpy as np
import pandas as pd
from django.db.models import Q
import numpy as np
from agents.models import Agent, Source, Team, AgentTarget, AgentSaleTarget
from takvim.models import Deal
from functools import lru_cache
from typing import List, Dict, Tuple, Optional, Union
from dataclasses import dataclass, field
from calendar import monthrange
from collections import Counter
@dataclass
class Indicator:
'''
A class to represent an KPI indicators.
'''
value: float
old_value: float
change: float = field(init=False)
change_percentage: float = field(init=False)
direction : str = field(init=False)
value_str : str = field(init=False)
css: str = ""
name: str = "No name"
prefix : str = ""
suffix : str = ""
precision: int = 0
def __post_init__(self):
self.change = self.value - self.old_value
self.change_percentage = (
self.change / self.old_value * 100 if self.old_value != 0 else 0
)
self.direction = "up" if self.change > 0 else "down"
self.css = "success" if self.change > 0 else "danger"
self.value_str = f"{self.prefix}{self.value}{self.suffix}"
agents = Agent.objects.all()
agents_data = [
{"name": x.name, "team": x.team.name if x.team else "No team"} for x in agents
]
sources = Source.objects.all()
sources_data = [{"name": x.name} for x in sources]
def create_new_deal_stage(row, start_date, end_date):
## Updated_at eklenebilir
## 2. çağrı, 3. çağrı, teklif hatırlatma taskları eklenebilir
print(start_date)
print(end_date)
if row.deal_updated_at_6 and start_date < row.deal_updated_at_6 < end_date:
return "Deposit Process"
elif row.deal_updated_at_5 and start_date < row.deal_updated_at_5 < end_date:
return "Second Contact & Quote"
elif row.cf_proposal_reminder_task_completed and start_date < row.cf_proposal_reminder_task_completed < end_date:
return "Second Contact & Quote"
elif row.deal_updated_at_4 and start_date < row.deal_updated_at_4 < end_date:
return "Information Received"
elif row.deal_updated_at_3 and start_date < row.deal_updated_at_3 < end_date:
return "Waiting for Information"
elif row.deal_updated_at_2 and start_date < row.deal_updated_at_2 < end_date:
return "First Contact"
elif row.cf_2nd_call_task_completed and start_date < row.cf_2nd_call_task_completed < end_date:
return "First Contact"
elif row.cf_3rd_call_task_completed and start_date < row.cf_3rd_call_task_completed < end_date:
return "First Contact"
elif row.deal_updated_at_1 and start_date < row.deal_updated_at_1 < end_date:
return "New"
elif start_date < row.created_at < end_date:
return "New"
## Buranın altı Mert'in fikri, kontrol etti
elif row.closed_date and start_date < row.closed_date < end_date:
return "Lost"
## Bitti
else:
return "raporda olmayacak"
def create_new_deal_stage_vectorized(df, start_date, end_date):
print("rapor elemesi:", start_date, end_date)
conditions = [
(df['deal_updated_at_6'].notnull()) & (df['deal_updated_at_6'] > start_date) & (df['deal_updated_at_6'] < end_date),
(df['closed_date'].notnull()) & (df['closed_date'] > start_date) & (df['closed_date'] < end_date),
(df['deal_updated_at_5'].notnull()) & (df['deal_updated_at_5'] > start_date) & (df['deal_updated_at_5'] < end_date),
(df['cf_proposal_reminder_task_completed'].notnull()) & (df['cf_proposal_reminder_task_completed'] > start_date) & (df['cf_proposal_reminder_task_completed'] < end_date),
(df['deal_updated_at_4'].notnull()) & (df['deal_updated_at_4'] > start_date) & (df['deal_updated_at_4'] < end_date),
(df['deal_updated_at_3'].notnull()) & (df['deal_updated_at_3'] > start_date) & (df['deal_updated_at_3'] < end_date),
(df['deal_updated_at_2'].notnull()) & (df['deal_updated_at_2'] > start_date) & (df['deal_updated_at_2'] < end_date),
(df['cf_2nd_call_task_completed'].notnull()) & (df['cf_2nd_call_task_completed'] > start_date) & (df['cf_2nd_call_task_completed'] < end_date),
(df['cf_3rd_call_task_completed'].notnull()) & (df['cf_3rd_call_task_completed'] > start_date) & (df['cf_3rd_call_task_completed'] < end_date),
(df['deal_updated_at_1'].notnull()) & (df['deal_updated_at_1'] > start_date) & (df['deal_updated_at_1'] < end_date),
(df['created_at'] > start_date) & (df['created_at'] < end_date),
]
choices = ['Deposit Process', 'Lost', 'Second Contact & Quote', 'Second Contact & Quote', 'Information Received', 'Waiting for Information', 'First Contact', 'First Contact', 'First Contact', 'New', 'New', ]
df['new_deal_stage'] = np.select(conditions, choices, default='NO')
return df
def create_team(row):
for agent in agents_data:
if row.owner == agent["name"]:
return agent["team"]
return "No team"
def apply_team_vectorized(df):
agent_teams = {agent["name"]: agent["team"] for agent in agents_data}
df["team"] = df["owner"].map(lambda x: agent_teams.get(x, "No team"))
return df
def create_source(row):
for source in sources_data:
if row.source == source["name"]:
return source["name"]
return "others"
sources = [source.name for source in Source.objects.all()]
source_mapping = {
'facebook': 'facebook',
'google': 'google',
'influencer': 'influencer',
'whatclinic': 'whatclinic'
}
def create_source_vectorized(source):
if not source:
return 'others'
return source_mapping.get(source, 'others')
# Second Contact
def create_second_contact(row):
deal_stage = row.new_deal_stage
deal_stages = [
"Second Contact & Quote",
"Deposit Process",
"Operation Done",
"Operation Team Briefing",
]
deal_reasons = [
"Not contacted - 4 times",
"Not contacted",
"Not interested",
"Other",
]
if deal_stage in deal_stages or (
deal_stage == "Lost" and row.deal_reason in deal_reasons
):
return 1
else:
return 0
# Reach
def create_second_contact_vectorized(df):
deal_stages = [
"Second Contact & Quote",
"Deposit Process",
"Operation Done",
"Operation Team Briefing",
]
deal_reasons = [
"Not contacted - 4 times",
"Not contacted",
"Not interested",
"Other",
]
df["second_contact"] = ((df["new_deal_stage"].isin(deal_stages)) |
((df["new_deal_stage"] == "Lost") & (~df["deal_reason"].isin(deal_reasons)))
).astype(int)
return df
'''
if not ISNULL([Deal Updated At 6]) and [Deal Updated At 6]>[End Date] then 'next range sale'
elseif not ISNULL([Deal Updated At 6]) and [Deal Updated At 6]< [Start Date] then 'previous range sale'
else 'in range sale'
END'''
def create_range_vectorized(df, start_date, end_date):
def vectorized_create_range(start_date, end_date, deal_updated_at_6):
if deal_updated_at_6 and deal_updated_at_6 > end_date:
return 'next range sale'
elif deal_updated_at_6 and deal_updated_at_6 < start_date:
return 'previous range sale'
else:
return 'in range sale'
return df.apply(lambda row: vectorized_create_range(start_date, end_date, row.deal_updated_at_6), axis=1)
def create_new_reach_vectorized(df):
def vectorized_create_reach(row):
if row.deal_updated_at_3 or row.deal_updated_at_4 or row.deal_updated_at_5 or row.deal_updated_at_6 or row.cf_proposal_reminder_task_completed:
return 1
elif row.deal_stage == 'Lost' and row.deal_reason not in ['Not contacted - 4 times', 'Not contacted']:
return 1
else:
return 0
return df.apply(lambda row: vectorized_create_reach(row), axis=1)
def create_new_interest_vectorized(df):
def vectorized_create_interest(row):
if row.deal_updated_at_3 or row.deal_updated_at_4 or row.deal_updated_at_5 or row.deal_updated_at_6 or row.cf_proposal_reminder_task_completed:
return 1
elif row.deal_stage == 'Lost' and row.deal_reason not in ['Not contacted - 4 times', 'Not contacted', 'Not interested', 'Other']:
return 1
else:
return 0
return df.apply(lambda row: vectorized_create_interest(row), axis=1)
def create_new_second_contact_vectorized(df):
def vectorized_create_second_contact(row):
if not row.deal_updated_at_5 or row.deal_updated_at_6 or row.cf_proposal_reminder_task_completed:
return 1
elif row.new_deal_stage == 'Lost' and row.deal_reason not in ['Not contacted - 4 times', 'Not contacted', 'Not interested', 'Other']:
return 1
else:
return 0
return df.apply(lambda row: vectorized_create_second_contact(row), axis=1)
def create_reach(row):
deal_stages = [
"Second Contact & Quote",
"Deposit Process",
"Waiting for Information",
"Information Received",
"Operation Done",
"Operation Team Briefing",
]
deal_reasons = ["Not contacted - 4 times", "Not contacted"]
if row.new_deal_stage in deal_stages or (
row.new_deal_stage == "Lost" and row.deal_reason not in deal_reasons
):
return 1
else:
return 0
# create interest
'''
"if not ISNULL([Deal Updated At 6]) or
not ISNULL([Deal Updated At 5])or
not ISNULL([Deal Updated At 4])or
not ISNULL([Deal Updated At 3])or
not ISNULL([Cf Proposal Reminder Task Completed])or
#([Deal Stage]= 'Lost' and not [Deal Reason] in (""Not contacted - 4 times"", ""Not contacted""))
then 1 else 0 END"
'''
def apply_create_reach_vectorized(df):
deal_stages = [
"Second Contact & Quote",
"Deposit Process",
"Waiting for Information",
"Information Received",
"Operation Done",
"Operation Team Briefing",
]
deal_reasons = [
"Not contacted - 4 times",
"Not contacted"
]
df["reach"] = ((df["new_deal_stage"].isin(deal_stages)) |
((df["new_deal_stage"] == "Lost") & (~df["deal_reason"].isin(deal_reasons)))
).astype(int)
def create_interest(row):
deal_stages = [
"Second Contact & Quote",
"Deposit Process",
"Waiting for Information",
"Information Received",
"Operation Done",
"Operation Team Briefing",
]
deal_reasons = [
"Not contacted - 4 times",
"Not contacted",
"Not interested",
"Other",
]
if row.deal_stage in deal_stages or (
row.deal_stage == "Lost" and row.deal_reason not in deal_reasons
):
return 1
else:
return 0
def apply_create_interest_vectorized(df):
deal_stages = [
"Second Contact & Quote",
"Deposit Process",
"Waiting for Information",
"Information Received",
"Operation Done",
"Operation Team Briefing",
]
deal_reasons = [
"Not contacted - 4 times",
"Not contacted",
"Not interested",
"Other",
]
df["interest"] = ((df["new_deal_stage"].isin(deal_stages)) |
((df["new_deal_stage"] == "Lost") & (~df["deal_reason"].isin(deal_reasons)))
).astype(int)
# create paid / non paid
def create_traffic_type(row):
if row.cost and row.cost > 0:
return "paid"
else:
return "non-paid"
# Sale Status
_s = pd.Timestamp("2019-01-01", tz="Europe/Istanbul")
_e = pd.Timestamp("2079-01-01", tz="Europe/Istanbul")
def create_sale_status(row, start_date, end_date):
if row.deal_updated_at_6 and start_date < row.deal_updated_at_6 < end_date:
return "Sale in Report Range"
if row.deal_updated_at_6:
return "Sale"
else:
return "No Sale"
# Contact Type
def create_sale_status_vectorized(df, start_date, end_date):
# Initialize a new column in the dataframe to store the sale status
df['sale_status'] = 0
df['sale_in_report_range'] = 0
# # Mark rows with a sale in the report range
# in_report_range_mask = (df['deal_updated_at_6'].notnull()) & (start_date < df['created_at']) & (
# df['deal_updated_at_6'] < end_date)
# df.loc[in_report_range_mask, 'sale_status'] = 1
# df.loc[in_report_range_mask, 'sale_in_report_range'] = 1
#
# # Mark rows with a sale outside the report range
# outside_report_range_mask = (df['deal_updated_at_6'].notnull()) & (
# (df['deal_updated_at_6'] < start_date) | (end_date < df['deal_updated_at_6']))
# df.loc[outside_report_range_mask, 'sale_status'] = 1
in_report_range_mask = (df['deal_updated_at_6'].notnull())
df.loc[in_report_range_mask, 'sale_status'] = 1
#df.loc[in_report_range_mask, 'sale_in_report_range'] = 1
return df['sale_status']
def create_contact_type(row, start_date, end_date):
if row.created_at < start_date:
return "Old"
else:
return "Fresh"
'''
"IF [Created At]<= [End Date]and [Created At]>= [Start Date] then 'fresh'
ELSEIF [Deal Updated At 6]>=[Start Date] and [Deal Updated At 6]<=[End Date]then 'fresh'
ELSEIF [Created At]<[Start Date] then 'old'
else 'not in report'
END"'''
def create_contact_type_vectorized(df, start_date, end_date):
# Initialize a new column in the dataframe to store the contact type
df['contact_type'] = 'not in report'
# Mark rows with a sale in the report range
in_report_range_mask = (start_date <= df['created_at']) & (df['created_at'] <= end_date)
df.loc[in_report_range_mask, 'contact_type'] = 'fresh'
# Mark rows with a sale outside the report range
outside_report_range_mask = (df['created_at'] < start_date)
df.loc[outside_report_range_mask, 'contact_type'] = 'old'
in_report_range_mask = (df['deal_updated_at_6'].notnull()) & (start_date <= df['deal_updated_at_6']) & (df['deal_updated_at_6'] <= end_date)
df.loc[in_report_range_mask, 'contact_type'] = 'fresh'
return df['contact_type']
def safe_div(x, y):
if y == 0:
return 0
return x / y
def create_stats(df):
total = df.shape[0]
deal_stage_counts = dict(df["new_deal_stage"].value_counts())
contact_type_counts = df['contact_type'].value_counts()
distinct_owner = df["owner"].nunique()
distinct_section = df["section"].nunique()
distinct_source = df["source"].nunique()
reach_sum = df["reach"].sum()
interest_sum = df["interest"].sum()
second_contact_sum = df["second_contact"].sum()
sale_sum = df["sale_status"].sum()
cost_sum = df["cost"].sum()
revenue_sum = df["amount"].sum()
revenue_in_range_sum = df["revenue_in_report_range"].sum() # kontrol
cost_in_range_sum = df["cost_in_report_range"].sum()
sale_in_range = df["sale_in_report_range"].sum() # dogru
average_cost = safe_div(cost_sum, total)
average_cost_per_agent = safe_div(cost_sum, distinct_owner)
average_revenue = safe_div(revenue_sum , sale_sum)
average_revenue_in_range = safe_div(revenue_in_range_sum , sale_in_range)
new_ratio = safe_div( ( deal_stage_counts.get("New", 0) + deal_stage_counts.get("First Contact", 0) ) , total)
reach_2_interest = safe_div(1, safe_div(reach_sum, interest_sum))
interest_2_second_contact = safe_div(1, safe_div(interest_sum ,second_contact_sum))
second_contact_2_sale = safe_div(1, safe_div(second_contact_sum, sale_sum))
reach_rate = safe_div(reach_sum, total)
interest_rate = safe_div(interest_sum , total)
second_contact_rate = safe_div(second_contact_sum , total)
sale_rate = safe_div(sale_sum , total)
aov = safe_div(df["amount"].sum() , sale_sum)
roas = safe_div(df["amount"].sum() , df["cost"].sum())
roas_in_range = safe_div(revenue_in_range_sum , cost_sum)
avg_sale_owner = safe_div( sale_sum , distinct_owner)
avg_sale_section = safe_div(sale_sum , distinct_section)
avg_sale_source = safe_div(sale_sum , distinct_source)
avg_new_contact = safe_div(contact_type_counts.get('Fresh', 0) , distinct_owner)
avg_contact = safe_div(total , distinct_owner)
avg_sale_in_month= safe_div(sale_in_range , distinct_owner)
total_deals_per_owner = safe_div(total , distinct_owner)
total_cost_per_owner = safe_div(cost_sum , distinct_owner)
total_revenue_per_owner = safe_div(revenue_sum , distinct_owner)
total_revenue_in_range_per_owner = safe_div(revenue_in_range_sum , distinct_owner)
total_sale_in_range_per_owner = safe_div(sale_in_range , distinct_owner)
total_cost_in_range_per_owner = safe_div(cost_in_range_sum , distinct_owner)
stats = {
"total": total,
"total_cost": cost_sum,
"total_revenue": revenue_sum,
"total_revenue_in_range": revenue_in_range_sum,
"total_sale_in_range": sale_in_range,
"total_cost_in_range": cost_in_range_sum,
"total_contact_in_range": contact_type_counts.get('Fresh', 0),
"average_cost": average_cost,
"average_cost_per_agent": average_cost_per_agent,
"average_revenue_in_range": average_revenue_in_range,
"average_revenue": average_revenue,
"reach_sum": reach_sum,
"interest_sum": interest_sum,
"second_contact_sum": second_contact_sum,
"sale_sum": sale_sum,
"new_ratio": new_ratio,
"reach_2_interest": reach_2_interest,
"interest_2_second_contact": interest_2_second_contact,
"second_contact_2_sale": second_contact_2_sale,
"reach_rate": reach_rate,
"interest_rate": interest_rate,
"second_contact_rate": second_contact_rate,
"sale_rate": sale_rate,
"aov": aov,
"roas": roas,
"roas_in_range": roas_in_range,
"active_agent": distinct_owner,
"avg_new_contact": avg_new_contact,
"avg_contact": avg_contact,
"avg_sale_in_month": avg_sale_in_month,
"avg_sale_owner": avg_sale_owner,
"avg_sale_section": avg_sale_section,
"avg_sale_source": avg_sale_source,
"total_deals_per_owner": total_deals_per_owner,
"total_cost_per_owner": total_cost_per_owner,
"total_revenue_per_owner": total_revenue_per_owner,
"total_revenue_in_range_per_owner": total_revenue_in_range_per_owner,
"total_sale_in_range_per_owner": total_sale_in_range_per_owner,
"total_cost_in_range_per_owner": total_cost_in_range_per_owner,
}
return stats
def convert_date_columns_to_datastudio_format(df):
date_columns = [
"created_at",
"closed_date",
"deal_updated_at_1",
"deal_updated_at_2",
"deal_updated_at_3",
"deal_updated_at_4",
"deal_updated_at_5",
"deal_updated_at_6",
"deal_updated_at_7",
"deal_updated_at_8",
]
for column in date_columns:
df[column] = df[column].dt.strftime("%Y%m%d%H%M%S")
return df
def _get_data(start_date, end_date):
'''
start_date: str,
end_date: str
return: pandas.DataFrame
'''
fields = [
"id",
"amount",
"source",
"campaign",
"section",
"created_at",
"closed_date",
"owner",
"cost_monthly",
"deal_stage",
"deal_reason",
"deal_updated_at_1",
"deal_updated_at_2",
"deal_updated_at_3",
"deal_updated_at_4",
"deal_updated_at_5",
"deal_updated_at_6",
"deal_updated_at_7",
"deal_updated_at_8",
"cf_proposal_reminder_task_completed",
"cf_2nd_call_task_completed",
"cf_3rd_call_task_completed",
'country'
]
deals = Deal.objects.filter(created_at__lte=end_date).filter(
Q(closed_date__gte=start_date) | Q(closed_date__isnull=True)
)
deals = deals.values(*fields)
df = pd.DataFrame(deals)
return df
def get_data(start_date="2023-01-10", end_date="2023-01-20"):
df = _get_data(start_date, end_date)
df["new_deal_stage"] = df.apply(
lambda x: create_new_deal_stage(
x, arrow.get(start_date).date(), arrow.get(end_date).date()
),
axis=1,
)
df["team"] = df.apply(lambda x: create_team(x), axis=1)
df["new_source"] = df.apply(lambda x: create_source(x), axis=1)
df["second_contact"] = df.apply(lambda x: create_second_contact(x), axis=1)
df["reach"] = df.apply(lambda x: create_reach(x), axis=1)
df["interest"] = df.apply(lambda x: create_interest(x), axis=1)
df["sale_status"] = df.apply(
lambda x: create_sale_status(
x, arrow.get(start_date).date(), arrow.get(end_date).date()
),
axis=1,
)
df["contact_type"] = df.apply(
lambda x: create_contact_type(
x, arrow.get(start_date).date(), arrow.get(end_date).date()
),
axis=1,
)
df["traffic_type"] = df.apply(lambda x: create_traffic_type(x), axis=1)
df["cost"].fillna(0, inplace=True)
# df.replace({np.nan: None}, inplace=True)
return df
def process_and_limit_all_data(df, start_date, end_date):
'''
Receives a dataframe of all deals and returns a dataframe of deals that are in the report range and
process it to be used in the report with extra columns:
- new_deal_stage
- team
- new_source
- second_contact
- reach
- interest
- sale_status
- contact_type
- traffic_type
'''
print("new 2")
start_date = arrow.get(start_date)
end_date = arrow.get(end_date).replace(hour=23, minute=59, second=59)
start_date_utc = pd.Timestamp(start_date.datetime)
end_date_utc = pd.Timestamp(end_date.datetime)
print("process and limit dates", start_date, end_date, start_date_utc, end_date_utc)
#cost monthly'yi cost'a çevird
df.rename(columns={'cost_monthly': 'cost'}, inplace=True)
df['range'] = create_range_vectorized(df, start_date_utc, end_date_utc)
## seçilen tarih aralığına göre yeni deal stage oluşturmak için
create_new_deal_stage_vectorized(df, start_date_utc, end_date_utc)
## tarih aralığı dışındaki kayıtları silmek için
df.drop(df[df["new_deal_stage"] == "NO"].index, inplace=True)
df.drop(df[df["range"] == "previous range sale"].index, inplace=True)
#
apply_team_vectorized(df)
df["new_source"] = df['source'].map(create_source_vectorized)
create_second_contact_vectorized(df)
apply_create_reach_vectorized(df)
#create_reach_vectorized(df)
#df['second_contact'] = create_new_second_contact_vectorized(df)
apply_create_interest_vectorized(df)
#df['interest'] = create_new_interest_vectorized(df)
#df['reach'] = create_new_reach_vectorized(df)
create_sale_status_vectorized(df, start_date_utc, end_date_utc)
create_contact_type_vectorized(df, start_date_utc, end_date_utc)
df['contact_type'] = np.where(df['created_at'] < start_date_utc, 'Old', 'Fresh')
df['traffic_type'] = np.where(df['cost'] > 0, 'paid', 'non-paid')
df['cost_in_report_range'] = np.where(df['contact_type'] == 'Fresh', df['cost'], 0)
df['revenue_in_report_range'] = np.where(df['range'] == 'in range sale', df['amount'], 0)
df['sale_in_report_range'] = np.where(df['range'] == 'in range sale', df['sale_status'], 0)
df["cost"].fillna(0, inplace=True)
df.replace({np.nan: None}, inplace=True)
return df
@lru_cache(maxsize=10)
def get_report_data(start_date, end_date):
print("get_report_data dates", start_date, end_date)
all_data = _get_data('2020-01-01', '2067-01-01')
df = process_and_limit_all_data(all_data, start_date, end_date)
return df
def get_report_data_with_comparison(
start_date: str,
end_date: str,
comparison_start_date: str,
comparison_end_date: str
) -> (pd.DataFrame, Dict[str, Indicator]):
print("comp dates:", start_date, end_date, comparison_start_date, comparison_end_date)
df = get_report_data(start_date, end_date)
comparison_df = get_report_data(comparison_start_date, comparison_end_date)
stats = create_stats(df)
comparison_stats = create_stats(comparison_df)
indicators = create_indicators(stats, comparison_stats)
return df, indicators
def get_report_data_with_comparison_by_agent(
start_date: str,
end_date: str,
comparison_start_date: str,
comparison_end_date: str,
agent: Agent
) -> (pd.DataFrame, Dict[str, Indicator]):
df = get_report_data(start_date, end_date)
df = df[df['owner'] == agent.name]
comparison_df = get_report_data(comparison_start_date, comparison_end_date)
comparison_df = comparison_df[comparison_df['owner'] == agent.name]
team_df = get_report_data(start_date, end_date)
team_df = team_df[team_df['team'] == agent.team.name]
team_stats = create_stats(team_df)
brand_df = get_report_data(start_date, end_date)
section = agent.sections.all().first()
brand_df = brand_df[brand_df['section'] == section.name]
brand_stats = create_stats(brand_df)
stats = create_stats(df)
comparison_stats = create_stats(comparison_df)
indicators = create_indicators(stats, comparison_stats)
team_indicators = create_indicators(stats, team_stats)
brand_indicators = create_indicators(stats, brand_stats)
return df, indicators, team_indicators, brand_indicators
def _get_prefix(key: str) -> str:
if key in ['total_cost', 'profit', 'revenue', 'total_revenue', 'cost', 'revenue_in_report_range', 'cost_in_report_range']:
return '$'
return ''
def _get_suffix(key: str) -> str:
if key in ['revenue', 'cost', 'profit']:
return '%'
return ''
def create_indicators(
stats: Dict[str, Union[float, int]],
comparison_stats: Dict[str, Union[float, int]]
) -> Dict[str, Indicator]:
return {
key: Indicator(name=key, value=value, old_value=comparison_stats[key], prefix=_get_prefix(key), suffix=_get_suffix(key))
for key, value in stats.items()
if key in comparison_stats
}
def get_data_studio_report_data(start_date, end_date):
df = get_report_data(start_date, end_date)
df = convert_date_columns_to_datastudio_format(df)
return df
def filter_dataframe(df, countries=None, sections=None, agents=None, sources=None):
if countries:
countries = [country.upper() for country in countries]
print(countries)
df = df[df['country'].isin(countries)]
if sections:
sections = [section.name for section in sections]
print(sections)
df = df[df['section'].isin(sections)]
if agents:
agents = [agent.name for agent in agents]
print(agents)
df = df[df['owner'].isin(agents)]
if sources:
sources = [source.name for source in sources]
df = df[df['source'].isin(sources)]
return df
@dataclass
class ChartRow:
target: int
actual: int
month_completion: float
def __post_init__(self):
self.projection = self.actual / self.month_completion
self.target_as_now = self.target * self.month_completion
self.range = self.target + 25
def _get_new_source(source):
if source in ["facebook","google","whatclinic","influencer"]:
return source
return "others"
def get_agent_performance_chart(agent: Agent, month: int, year: int) -> Dict[str, ChartRow]:
targets = AgentTarget.objects.filter(agent=agent, month__month=month, month__year=year)
total_days = monthrange(year, month)[1]
today = arrow.now().day
deals = Deal.objects.filter(owner=agent.name, created_at__month=month, created_at__year=year)
dx = [_get_new_source(d.source) for d in deals]
c = Counter(dx)
result = {}
for t in targets:
result[t.source.name] = ChartRow(target=t.target, actual=c[t.source.name], month_completion=today/total_days)
print(result)
return result
start_date = arrow.get(year, month, 1).datetime
end_date = arrow.get(year, month, 1).shift(months=1).datetime
df = get_report_data(start_date, end_date)
df = df[df['owner'] == agent.name]
df = df[['created_at', 'revenue', 'cost', 'profit']]
df = df.groupby('created_at').sum()
df = df.reset_index()
df = convert_date_columns_to_datastudio_format(df)
return df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment