Created
March 18, 2025 06:22
-
-
Save bhoitea/d379d27f1aff8ccf5992acce82187cca to your computer and use it in GitHub Desktop.
Syncing Data from Azure AI Search to YugabyteDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import psycopg2 | |
import re | |
# ๐น Azure AI Search Configuration | |
AZURE_SEARCH_ENDPOINT = "https://your-endpoint.search.windows.net" | |
AZURE_SEARCH_INDEX = "insurance-claims-index-v2" | |
AZURE_SEARCH_API_KEY = "your-search-key" | |
# ๐น YugabyteDB Connection Configuration | |
YB_DB_HOST = "your-db-host" | |
YB_DB_PORT = "5433" | |
YB_DB_NAME = "yugabyte" | |
YB_DB_USER = "yugabyte" | |
YB_DB_PASSWORD = "your-password" | |
# ๐น Function to fetch data from Azure AI Search | |
def fetch_documents(): | |
url = f"{AZURE_SEARCH_ENDPOINT}/indexes/{AZURE_SEARCH_INDEX}/docs?api-version=2023-07-01-Preview&search=*" | |
headers = { | |
"api-key": AZURE_SEARCH_API_KEY, | |
"Content-Type": "application/json" | |
} | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
documents = response.json().get("value", []) | |
print(f"Documents fetched: {documents}") # Log the documents for inspection | |
return documents | |
else: | |
print(f"โ Error: {response.status_code} - {response.text}") | |
return [] | |
# ๐น Function to parse the content and extract claim details | |
def extract_claim_details(content): | |
claim_details = {} | |
claim_details["claim_id"] = re.search(r"Claim ID:\s*([^\n]+)", content) | |
claim_details["policy_id"] = re.search(r"Policy ID:\s*([^\n]+)", content) | |
claim_details["claimant_name"] = re.search(r"Claimant Name:\s*([^\n]+)", content) | |
claim_details["claim_status"] = re.search(r"Claim Status:\s*([^\n]+)", content) | |
claim_details["claim_amount"] = re.search(r"Claim Amount:\s*\$?([0-9.,]+)", content) | |
claim_details["claim_date"] = re.search(r"Claim Date:\s*([^\n]+)", content) | |
claim_details["document_url"] = re.search(r"Document URL:\s*([^\n]+)", content) | |
claim_details["claim_description"] = re.search(r"Claim Description:\s*([^\n]+)", content) | |
# Extract and clean the data if the match was found, otherwise set as None | |
return {key: match.group(1) if match else None for key, match in claim_details.items()} | |
# ๐น Function to insert data into YugabyteDB | |
def insert_into_yugabyte(doc): | |
# Strip any leading or trailing whitespace from each field | |
doc = {key: value.strip() if isinstance(value, str) else value for key, value in doc.items()} | |
print(f"Inserting document: {doc}") # Log the document to verify the cleaned data | |
conn = None | |
try: | |
conn = psycopg2.connect( | |
dbname=YB_DB_NAME, user=YB_DB_USER, password=YB_DB_PASSWORD, | |
host=YB_DB_HOST, port=YB_DB_PORT | |
) | |
cursor = conn.cursor() | |
sql = """ | |
INSERT INTO claims ( | |
claim_id, policy_id, claimant_name, claim_status, | |
claim_amount, claim_description, claim_date, document_url | |
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) | |
ON CONFLICT (claim_id) DO UPDATE | |
SET policy_id = EXCLUDED.policy_id, | |
claimant_name = EXCLUDED.claimant_name, | |
claim_status = EXCLUDED.claim_status, | |
claim_amount = EXCLUDED.claim_amount, | |
claim_description = EXCLUDED.claim_description, | |
claim_date = EXCLUDED.claim_date, | |
document_url = EXCLUDED.document_url; | |
""" | |
cursor.execute(sql, ( | |
doc["claim_id"], doc["policy_id"], doc["claimant_name"], doc["claim_status"], | |
doc["claim_amount"], doc.get("claim_description", ""), doc["claim_date"], doc["document_url"] | |
)) | |
conn.commit() | |
print(f"โ Inserted: {doc['claim_id']}") | |
except Exception as e: | |
print(f"โ Database Error: {e}") | |
finally: | |
if conn: | |
conn.close() | |
# ๐น Main script to sync data | |
def sync_data(): | |
documents = fetch_documents() | |
for doc in documents: | |
content = doc.get("content", "") | |
extracted_data = extract_claim_details(content) | |
insert_into_yugabyte(extracted_data) | |
# Run the sync process | |
if __name__ == "__main__": | |
sync_data() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment