Skip to content

Instantly share code, notes, and snippets.

@johndeyrup
Last active April 24, 2025 19:04
Show Gist options
  • Save johndeyrup/832dad2cba0ba9337b7a946c8b4115c5 to your computer and use it in GitHub Desktop.
Save johndeyrup/832dad2cba0ba9337b7a946c8b4115c5 to your computer and use it in GitHub Desktop.
import csv
import os
import pandas as pd
from intentsify.common.db.snowflake_manager import SnowflakeManager
if __name__ == "__main__":
dir_path = "/Users/johndeyrup/Documents/intentsify/ai-insights/src/bin/contact_data"
files = os.listdir(dir_path)
completed_lists = []
seniority_levels = [x[0] for x in SnowflakeManager.exec_select("select distinct(seniority_level_2) FROM DATAFEED.DBO.DEMAND_GEN_UNIVERSAL_PERSON_CONTACTS")]
department_levels = [x[0] for x in SnowflakeManager.exec_select("select distinct(department_2) FROM DATAFEED.DBO.DEMAND_GEN_UNIVERSAL_PERSON_CONTACTS")]
for file_name in files:
file_path = os.path.join(dir_path, file_name)
if "Suppression" not in file_path and "Columns" not in file_path:
sheets = pd.ExcelFile(file_path)
tal, persona = sheets.sheet_names
tal_df = pd.read_excel(file_path, sheet_name=tal).drop_duplicates()
domain = "Domain" if "Domain" in tal_df.columns.values else "domain"
tal_df.rename(columns={domain: "DOMAIN"}, inplace=True)
persona_df = pd.read_excel(file_path, sheet_name=persona)
# print(file_path)
# print(persona_df.head(10))
job_level_column = ""
if "Job Level" in persona_df.columns.values:
job_level_column = "Job Level"
elif "Job Levels" in persona_df.columns.values:
job_level_column = "Job Levels"
else:
job_level_column = "Job levels"
job_levels = persona_df[job_level_column][persona_df[job_level_column].notna()].values
cleaned_job_levels = []
for x in job_levels:
x = x.strip().lower()
if x == 'c-level':
cleaned_job_levels.append("'cxo'")
elif x == 'vp':
cleaned_job_levels.append("'vice president'")
else:
cleaned_job_levels.append(f"'{x}'")
job_levels = cleaned_job_levels
job_function_column = 'Job Function' if "Job Function" in persona_df.columns.values else "Job Functions"
job_functions = persona_df[job_function_column][persona_df[job_function_column].notna()].values
job_functions = [f"\'{x.strip().lower()}\'" for x in job_functions]
job_keyword_column = 'Job title keywords' if "Job title keywords" in persona_df.columns.values else "Job Title Keywords"
table_name = persona_df.iloc[1][0].replace(" ","_")
missing_job_functions = []
possible = set(department_levels)
for x in set(job_functions):
new_x = x.replace("'", "")
if new_x not in possible:
missing_job_functions.append(new_x)
if len(missing_job_functions) > 0:
print("Missing", file_name, ", ".join(missing_job_functions))
missing_job_functions = []
possible = set(seniority_levels)
for x in set(job_levels):
new_x = x.replace("'", "")
if new_x not in possible:
missing_job_functions.append(new_x)
if len(missing_job_functions) > 0:
print("Missing", file_name, ", ".join(missing_job_functions))
print("Writing", table_name)
exclusion_column = ""
if 'Job title exclusion keywords' in persona_df.columns.values:
exclusion_column = 'Job title exclusion keywords'
elif 'Job Title exclusion keywords' in persona_df.columns.values:
exclusion_column = 'Job Title exclusion keywords'
elif 'Job title keyword Exclusion' in persona_df.columns.values:
exclusion_column = 'Job title keyword Exclusion'
query = ""
has_exclusion = False
has_keyword = False
if pd.isna(persona_df.at[0, job_keyword_column]):
query = f"lower(seniority_level_2) in ({','.join(job_levels)}) and lower(department_2) in ({','.join(job_functions)})"
elif exclusion_column == "" or pd.isna(persona_df.at[0, exclusion_column]):
job_title_keywords = [f"\'%{x.lower()}%\'" for x in persona_df[job_keyword_column][persona_df[job_keyword_column].notna()].values]
query = f"lower(job_title) like any ({','.join(job_title_keywords)}) and lower(seniority_level_2) in ({','.join(job_levels)}) and lower(department_2) in ({','.join(job_functions)})"
has_keyword = True
else:
job_title_keywords = [f"\'%{x.lower()}%\'" for x in persona_df[job_keyword_column][persona_df[job_keyword_column].notna()].values]
job_title_exclusion_keywords = [f"\'%{x.lower()}%\'" for x in persona_df[exclusion_column][persona_df[exclusion_column].notna()].values]
query = f'lower(job_title) like any ({",".join(job_title_keywords)}) and not (lower(job_title) like any ({",".join(job_title_exclusion_keywords)})) and lower(seniority_level_2) in ({",".join(job_levels)}) and lower(department_2) in ({",".join(job_functions)})'
has_exclusion = True
has_keyword = True
base_table = f"{table_name}_domain"
domain_create = f'CREATE OR REPLACE TABLE "SOURCE"."PROD"."{base_table}" LIKE DEVELOPER.PUBLIC.DOMAINS_FOR_SARAH;'
SnowflakeManager.exec_query(domain_create)
SnowflakeManager.write_from_df(tal_df, base_table, "PROD", "SOURCE")
base_query = f"""
SELECT first_name, last_name, business_email, phone_number, company_name, job_title, 'N/A' as Industry,
'N/A' as company_size, seniority_level_2 as job_level, department_2 as job_function, 'N/A' as email_option, 'N/A' as asset_downloaded,
company_address as address, company_address as address2, company_city as city, company_state as state,
company_zip as postal_code, 'N/A' as country, 'N/A' as lead_download_date, 'N/A' as custom_question_1,
'N/A' as custom_question_2, 'N/A' as custom_question_3, 'N/A' as custom_question_4,
'N/A' as custom_question_5, 'N/A' as custom_question_6, 'N/A' as custom_question_7,
'N/A' as custom_question_8, 'N/A' as custom_question_9, linkedin_url as linedinprofileurl, {'2 as RANK,' if has_keyword else '3 as RANK'}
FROM DATAFEED.DBO.DEMAND_GEN_UNIVERSAL_PERSON_CONTACTS pc
join "SOURCE"."PROD"."{base_table}" d
on pc.company_domain = d.domain
left join developer.public.supress s
on pc.business_email = s.email
where {query} and s.email is null
ORDER BY RANDOM(1)
LIMIT 100000;
"""
results = SnowflakeManager.exec_select(base_query)
SnowflakeManager.exec_query(f"CREATE OR REPLACE TABLE SOURCE.DEV.{base_table} as " + base_query)
exclusion_query = ""
if has_exclusion:
exclusion_query = f' AND not (lower(bt.job_title) like any ({",".join(job_title_exclusion_keywords)}))'
if len(results) < 100000:
new_query = f"""
with base_target as (select * FROM DATAFEED.DBO.DEMAND_GEN_UNIVERSAL_PERSON_CONTACTS pc
join "SOURCE"."PROD"."{base_table}" d
on pc.company_domain = d.domain
left join developer.public.supress s
on pc.business_email = s.email
where s.email is null),
keyword_target as (select * from base_target where {query}),
function_target as (select bt.* from base_target bt left join keyword_target kt on bt.business_email = kt.business_email
where {f"lower(bt.seniority_level_2) in ({','.join(job_levels)}) and lower(bt.department_2) in ({','.join(job_functions)})"} and kt.business_email is null{exclusion_query})
select first_name, last_name, business_email, phone_number, company_name, job_title, 'N/A' as Industry,
'N/A' as company_size, seniority_level_2 as job_level, department_2 as job_function, 'N/A' as email_option, 'N/A' as asset_downloaded,
company_address as address, company_address as address2, company_city as city, company_state as state,
company_zip as postal_code, 'N/A' as country, 'N/A' as lead_download_date, 'N/A' as custom_question_1,
'N/A' as custom_question_2, 'N/A' as custom_question_3, 'N/A' as custom_question_4,
'N/A' as custom_question_5, 'N/A' as custom_question_6, 'N/A' as custom_question_7,
'N/A' as custom_question_8, 'N/A' as custom_question_9, linkedin_url as linedinprofileurl, 3 as RANK
from function_target order by random(1) limit {100000 - len(results)};
"""
results2 = SnowflakeManager.exec_select(new_query)
results += results2
SnowflakeManager.exec_query(f"INSERT INTO SOURCE.DEV.{base_table} " + new_query)
print(len(results))
header = ["first_name", "last_name", "business_email", "phone_number", "company_name", "job_title", "Industry", "company_size", "job_level", "job_function", "email_option", "asset_downloaded", "company_address as address", "address2", "city", "state", "postal_code", "country", "lead_download_date", "custom_question_1", "custom_question_2", "custom_question_3", "custom_question_4", "custom_question_5", "custom_question_6", "custom_question_7", "custom_question_8", "custom_question_9", "linedinprofileurl"]
with open(table_name + ".csv", 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(header)
writer.writerows(results)
completed_lists.append(table_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment