Created
March 19, 2025 18:35
-
-
Save aaronpolhamus/c47efe359f4c0a049ce1180cd569054b to your computer and use it in GitHub Desktop.
Test script for conversation_monitoring DAG with updated vest-ai endpoints
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import gspread | |
import multiprocessing | |
import pandas as pd | |
import re | |
from joblib import Parallel, delayed | |
from sqlalchemy import text | |
from analytics.airflow.dags import default_args | |
from analytics.etl.dbs import get_vest_bi_prod_engine | |
from analytics.etl.google_drive.utils import ( | |
download_sheets_file, | |
save_df_to_sheet, | |
save_string_to_drive, | |
) | |
from analytics.etl.intercom.transcript_maker import ( | |
detect_cx_interaction, | |
detect_pure_bot_convo, | |
make_convo_transcript, | |
) | |
from analytics.llms.utils import get_llm_response, get_vestai_token | |
DW_REPORT_CREDENTIALS = "credentials/northbound-prod-eed6cf87604b.json" | |
COMPLAINT_MAIN_DIRECTORY_ID = "1LKV5RAqMNwP4pYIK8XEkn8vpqkzyV5ps" | |
ORIGINAL_TRANSCRIPTS_FOLDER = "1lXKU6xG8JUpg4vIe0-HlKgVamhqXA6xr" | |
TRANSLATED_TRANSCRIPTS_FOLDER = "19OgEhsxuuI_MAeF4nnyCfvZXNfD8R6-8" | |
CUSTOMER_COMPLAINT_MASTER_REPORT = "1edND6ARfgCctGpvIxJIYRQC1zHJ39x3esiS5qo0ObX0" | |
KEYWORDS_FILE_ID = "102gzQvpZI6mgBh8hHFgpY7Y2HYdVP4B-dk-AWBTi-aQ" | |
CUSTOMER_ID = "vest_compliance_bot" | |
api_host = os.getenv("VEST_API_HOST") | |
rest_base_url = f"https://{api_host}" | |
token = get_vestai_token(rest_base_url, CUSTOMER_ID) | |
def get_updated_conversations(start_date): | |
return pd.read_sql( | |
text( | |
""" | |
WITH tags_array AS ( | |
SELECT | |
convo_id, | |
array_agg(tag->>'name') AS tag_names | |
FROM intercom_convos, | |
jsonb_array_elements(tags->'tags') AS tag | |
GROUP BY convo_id | |
) | |
SELECT | |
ic.convo_id, | |
ta.tag_names, | |
ic.state, | |
ic.updated_at, | |
ic.conversation_parts->>'total_count' AS total_interactions, | |
ic.conversation_parts, | |
ic.tags, | |
ai.account_number, | |
u.first_name || ' ' || u.last_name AS user_name, | |
u.user_email_address, | |
u.user_telephone_number, | |
u.country_abbv | |
FROM intercom_convos ic | |
JOIN tags_array ta ON ic.convo_id = ta.convo_id | |
LEFT JOIN link_table lt ON lt.user_id = ic.contacts->'contacts'->0->>'external_id' | |
LEFT JOIN account_info ai ON lt.account_number = ai.account_number | |
LEFT JOIN users u ON ai.user_id = u.user_id | |
WHERE ic.updated_at >= :start_date; | |
""", | |
), | |
get_vest_bi_prod_engine(), | |
params={"start_date": start_date}, | |
) | |
# Get last update date from master report | |
master_df = download_sheets_file( | |
DW_REPORT_CREDENTIALS, | |
CUSTOMER_COMPLAINT_MASTER_REPORT, | |
) | |
master_df["updated_at"] = pd.to_datetime(master_df["updated_at"]) | |
last_update = master_df["updated_at"].max() | |
# Get new conversations since last update | |
df = get_updated_conversations(last_update) | |
# Filter for live CX interactions | |
df["bots_only"] = df["conversation_parts"].apply( | |
lambda x: detect_pure_bot_convo(x["conversation_parts"]), | |
) | |
df["live_cx_interaction"] = df["conversation_parts"].apply( | |
lambda x: detect_cx_interaction(x["conversation_parts"]), | |
) | |
df = df[df["live_cx_interaction"]] | |
# Generate transcripts | |
df["transcript"] = df.apply(make_convo_transcript, axis=1) | |
_transcript = df.iloc[0]["transcript"] | |
_message = {"role": "user", "content": _transcript} | |
payload = { | |
"service_bot_type": "conversation_monitoring", | |
"message": _message | |
} | |
json_response = get_llm_response(base_url=rest_base_url, payload=payload, token=token) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment