Last active
April 24, 2025 19:04
-
-
Save johndeyrup/832dad2cba0ba9337b7a946c8b4115c5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import os | |
import pandas as pd | |
from intentsify.common.db.snowflake_manager import SnowflakeManager | |
if __name__ == "__main__": | |
dir_path = "/Users/johndeyrup/Documents/intentsify/ai-insights/src/bin/contact_data" | |
files = os.listdir(dir_path) | |
completed_lists = [] | |
seniority_levels = [x[0] for x in SnowflakeManager.exec_select("select distinct(seniority_level_2) FROM DATAFEED.DBO.DEMAND_GEN_UNIVERSAL_PERSON_CONTACTS")] | |
department_levels = [x[0] for x in SnowflakeManager.exec_select("select distinct(department_2) FROM DATAFEED.DBO.DEMAND_GEN_UNIVERSAL_PERSON_CONTACTS")] | |
for file_name in files: | |
file_path = os.path.join(dir_path, file_name) | |
if "Suppression" not in file_path and "Columns" not in file_path: | |
sheets = pd.ExcelFile(file_path) | |
tal, persona = sheets.sheet_names | |
tal_df = pd.read_excel(file_path, sheet_name=tal).drop_duplicates() | |
domain = "Domain" if "Domain" in tal_df.columns.values else "domain" | |
tal_df.rename(columns={domain: "DOMAIN"}, inplace=True) | |
persona_df = pd.read_excel(file_path, sheet_name=persona) | |
# print(file_path) | |
# print(persona_df.head(10)) | |
job_level_column = "" | |
if "Job Level" in persona_df.columns.values: | |
job_level_column = "Job Level" | |
elif "Job Levels" in persona_df.columns.values: | |
job_level_column = "Job Levels" | |
else: | |
job_level_column = "Job levels" | |
job_levels = persona_df[job_level_column][persona_df[job_level_column].notna()].values | |
cleaned_job_levels = [] | |
for x in job_levels: | |
x = x.strip().lower() | |
if x == 'c-level': | |
cleaned_job_levels.append("'cxo'") | |
elif x == 'vp': | |
cleaned_job_levels.append("'vice president'") | |
else: | |
cleaned_job_levels.append(f"'{x}'") | |
job_levels = cleaned_job_levels | |
job_function_column = 'Job Function' if "Job Function" in persona_df.columns.values else "Job Functions" | |
job_functions = persona_df[job_function_column][persona_df[job_function_column].notna()].values | |
job_functions = [f"\'{x.strip().lower()}\'" for x in job_functions] | |
job_keyword_column = 'Job title keywords' if "Job title keywords" in persona_df.columns.values else "Job Title Keywords" | |
table_name = persona_df.iloc[1][0].replace(" ","_") | |
missing_job_functions = [] | |
possible = set(department_levels) | |
for x in set(job_functions): | |
new_x = x.replace("'", "") | |
if new_x not in possible: | |
missing_job_functions.append(new_x) | |
if len(missing_job_functions) > 0: | |
print("Missing", file_name, ", ".join(missing_job_functions)) | |
missing_job_functions = [] | |
possible = set(seniority_levels) | |
for x in set(job_levels): | |
new_x = x.replace("'", "") | |
if new_x not in possible: | |
missing_job_functions.append(new_x) | |
if len(missing_job_functions) > 0: | |
print("Missing", file_name, ", ".join(missing_job_functions)) | |
print("Writing", table_name) | |
exclusion_column = "" | |
if 'Job title exclusion keywords' in persona_df.columns.values: | |
exclusion_column = 'Job title exclusion keywords' | |
elif 'Job Title exclusion keywords' in persona_df.columns.values: | |
exclusion_column = 'Job Title exclusion keywords' | |
elif 'Job title keyword Exclusion' in persona_df.columns.values: | |
exclusion_column = 'Job title keyword Exclusion' | |
query = "" | |
has_exclusion = False | |
has_keyword = False | |
if pd.isna(persona_df.at[0, job_keyword_column]): | |
query = f"lower(seniority_level_2) in ({','.join(job_levels)}) and lower(department_2) in ({','.join(job_functions)})" | |
elif exclusion_column == "" or pd.isna(persona_df.at[0, exclusion_column]): | |
job_title_keywords = [f"\'%{x.lower()}%\'" for x in persona_df[job_keyword_column][persona_df[job_keyword_column].notna()].values] | |
query = f"lower(job_title) like any ({','.join(job_title_keywords)}) and lower(seniority_level_2) in ({','.join(job_levels)}) and lower(department_2) in ({','.join(job_functions)})" | |
has_keyword = True | |
else: | |
job_title_keywords = [f"\'%{x.lower()}%\'" for x in persona_df[job_keyword_column][persona_df[job_keyword_column].notna()].values] | |
job_title_exclusion_keywords = [f"\'%{x.lower()}%\'" for x in persona_df[exclusion_column][persona_df[exclusion_column].notna()].values] | |
query = f'lower(job_title) like any ({",".join(job_title_keywords)}) and not (lower(job_title) like any ({",".join(job_title_exclusion_keywords)})) and lower(seniority_level_2) in ({",".join(job_levels)}) and lower(department_2) in ({",".join(job_functions)})' | |
has_exclusion = True | |
has_keyword = True | |
base_table = f"{table_name}_domain" | |
domain_create = f'CREATE OR REPLACE TABLE "SOURCE"."PROD"."{base_table}" LIKE DEVELOPER.PUBLIC.DOMAINS_FOR_SARAH;' | |
SnowflakeManager.exec_query(domain_create) | |
SnowflakeManager.write_from_df(tal_df, base_table, "PROD", "SOURCE") | |
base_query = f""" | |
SELECT first_name, last_name, business_email, phone_number, company_name, job_title, 'N/A' as Industry, | |
'N/A' as company_size, seniority_level_2 as job_level, department_2 as job_function, 'N/A' as email_option, 'N/A' as asset_downloaded, | |
company_address as address, company_address as address2, company_city as city, company_state as state, | |
company_zip as postal_code, 'N/A' as country, 'N/A' as lead_download_date, 'N/A' as custom_question_1, | |
'N/A' as custom_question_2, 'N/A' as custom_question_3, 'N/A' as custom_question_4, | |
'N/A' as custom_question_5, 'N/A' as custom_question_6, 'N/A' as custom_question_7, | |
'N/A' as custom_question_8, 'N/A' as custom_question_9, linkedin_url as linedinprofileurl, {'2 as RANK,' if has_keyword else '3 as RANK'} | |
FROM DATAFEED.DBO.DEMAND_GEN_UNIVERSAL_PERSON_CONTACTS pc | |
join "SOURCE"."PROD"."{base_table}" d | |
on pc.company_domain = d.domain | |
left join developer.public.supress s | |
on pc.business_email = s.email | |
where {query} and s.email is null | |
ORDER BY RANDOM(1) | |
LIMIT 100000; | |
""" | |
results = SnowflakeManager.exec_select(base_query) | |
SnowflakeManager.exec_query(f"CREATE OR REPLACE TABLE SOURCE.DEV.{base_table} as " + base_query) | |
exclusion_query = "" | |
if has_exclusion: | |
exclusion_query = f' AND not (lower(bt.job_title) like any ({",".join(job_title_exclusion_keywords)}))' | |
if len(results) < 100000: | |
new_query = f""" | |
with base_target as (select * FROM DATAFEED.DBO.DEMAND_GEN_UNIVERSAL_PERSON_CONTACTS pc | |
join "SOURCE"."PROD"."{base_table}" d | |
on pc.company_domain = d.domain | |
left join developer.public.supress s | |
on pc.business_email = s.email | |
where s.email is null), | |
keyword_target as (select * from base_target where {query}), | |
function_target as (select bt.* from base_target bt left join keyword_target kt on bt.business_email = kt.business_email | |
where {f"lower(bt.seniority_level_2) in ({','.join(job_levels)}) and lower(bt.department_2) in ({','.join(job_functions)})"} and kt.business_email is null{exclusion_query}) | |
select first_name, last_name, business_email, phone_number, company_name, job_title, 'N/A' as Industry, | |
'N/A' as company_size, seniority_level_2 as job_level, department_2 as job_function, 'N/A' as email_option, 'N/A' as asset_downloaded, | |
company_address as address, company_address as address2, company_city as city, company_state as state, | |
company_zip as postal_code, 'N/A' as country, 'N/A' as lead_download_date, 'N/A' as custom_question_1, | |
'N/A' as custom_question_2, 'N/A' as custom_question_3, 'N/A' as custom_question_4, | |
'N/A' as custom_question_5, 'N/A' as custom_question_6, 'N/A' as custom_question_7, | |
'N/A' as custom_question_8, 'N/A' as custom_question_9, linkedin_url as linedinprofileurl, 3 as RANK | |
from function_target order by random(1) limit {100000 - len(results)}; | |
""" | |
results2 = SnowflakeManager.exec_select(new_query) | |
results += results2 | |
SnowflakeManager.exec_query(f"INSERT INTO SOURCE.DEV.{base_table} " + new_query) | |
print(len(results)) | |
header = ["first_name", "last_name", "business_email", "phone_number", "company_name", "job_title", "Industry", "company_size", "job_level", "job_function", "email_option", "asset_downloaded", "company_address as address", "address2", "city", "state", "postal_code", "country", "lead_download_date", "custom_question_1", "custom_question_2", "custom_question_3", "custom_question_4", "custom_question_5", "custom_question_6", "custom_question_7", "custom_question_8", "custom_question_9", "linedinprofileurl"] | |
with open(table_name + ".csv", 'w', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow(header) | |
writer.writerows(results) | |
completed_lists.append(table_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment