Skip to content

Instantly share code, notes, and snippets.

@aaronpolhamus
Created March 19, 2025 18:35
Show Gist options
  • Save aaronpolhamus/c47efe359f4c0a049ce1180cd569054b to your computer and use it in GitHub Desktop.
Save aaronpolhamus/c47efe359f4c0a049ce1180cd569054b to your computer and use it in GitHub Desktop.
Test script for conversation_monitoring DAG with updated vest-ai endpoints
import os
import gspread
import multiprocessing
import pandas as pd
import re
from joblib import Parallel, delayed
from sqlalchemy import text
from analytics.airflow.dags import default_args
from analytics.etl.dbs import get_vest_bi_prod_engine
from analytics.etl.google_drive.utils import (
download_sheets_file,
save_df_to_sheet,
save_string_to_drive,
)
from analytics.etl.intercom.transcript_maker import (
detect_cx_interaction,
detect_pure_bot_convo,
make_convo_transcript,
)
from analytics.llms.utils import get_llm_response, get_vestai_token
DW_REPORT_CREDENTIALS = "credentials/northbound-prod-eed6cf87604b.json"
COMPLAINT_MAIN_DIRECTORY_ID = "1LKV5RAqMNwP4pYIK8XEkn8vpqkzyV5ps"
ORIGINAL_TRANSCRIPTS_FOLDER = "1lXKU6xG8JUpg4vIe0-HlKgVamhqXA6xr"
TRANSLATED_TRANSCRIPTS_FOLDER = "19OgEhsxuuI_MAeF4nnyCfvZXNfD8R6-8"
CUSTOMER_COMPLAINT_MASTER_REPORT = "1edND6ARfgCctGpvIxJIYRQC1zHJ39x3esiS5qo0ObX0"
KEYWORDS_FILE_ID = "102gzQvpZI6mgBh8hHFgpY7Y2HYdVP4B-dk-AWBTi-aQ"
CUSTOMER_ID = "vest_compliance_bot"
api_host = os.getenv("VEST_API_HOST")
rest_base_url = f"https://{api_host}"
token = get_vestai_token(rest_base_url, CUSTOMER_ID)
def get_updated_conversations(start_date):
return pd.read_sql(
text(
"""
WITH tags_array AS (
SELECT
convo_id,
array_agg(tag->>'name') AS tag_names
FROM intercom_convos,
jsonb_array_elements(tags->'tags') AS tag
GROUP BY convo_id
)
SELECT
ic.convo_id,
ta.tag_names,
ic.state,
ic.updated_at,
ic.conversation_parts->>'total_count' AS total_interactions,
ic.conversation_parts,
ic.tags,
ai.account_number,
u.first_name || ' ' || u.last_name AS user_name,
u.user_email_address,
u.user_telephone_number,
u.country_abbv
FROM intercom_convos ic
JOIN tags_array ta ON ic.convo_id = ta.convo_id
LEFT JOIN link_table lt ON lt.user_id = ic.contacts->'contacts'->0->>'external_id'
LEFT JOIN account_info ai ON lt.account_number = ai.account_number
LEFT JOIN users u ON ai.user_id = u.user_id
WHERE ic.updated_at >= :start_date;
""",
),
get_vest_bi_prod_engine(),
params={"start_date": start_date},
)
# Get last update date from master report
master_df = download_sheets_file(
DW_REPORT_CREDENTIALS,
CUSTOMER_COMPLAINT_MASTER_REPORT,
)
master_df["updated_at"] = pd.to_datetime(master_df["updated_at"])
last_update = master_df["updated_at"].max()
# Get new conversations since last update
df = get_updated_conversations(last_update)
# Filter for live CX interactions
df["bots_only"] = df["conversation_parts"].apply(
lambda x: detect_pure_bot_convo(x["conversation_parts"]),
)
df["live_cx_interaction"] = df["conversation_parts"].apply(
lambda x: detect_cx_interaction(x["conversation_parts"]),
)
df = df[df["live_cx_interaction"]]
# Generate transcripts
df["transcript"] = df.apply(make_convo_transcript, axis=1)
_transcript = df.iloc[0]["transcript"]
_message = {"role": "user", "content": _transcript}
payload = {
"service_bot_type": "conversation_monitoring",
"message": _message
}
json_response = get_llm_response(base_url=rest_base_url, payload=payload, token=token)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment