Skip to content

Instantly share code, notes, and snippets.

@bhoitea
Created March 18, 2025 06:22
Show Gist options
  • Save bhoitea/d379d27f1aff8ccf5992acce82187cca to your computer and use it in GitHub Desktop.
Save bhoitea/d379d27f1aff8ccf5992acce82187cca to your computer and use it in GitHub Desktop.
Syncing Data from Azure AI Search to YugabyteDB
import requests
import psycopg2
import re
# ๐Ÿ”น Azure AI Search Configuration
AZURE_SEARCH_ENDPOINT = "https://your-endpoint.search.windows.net"
AZURE_SEARCH_INDEX = "insurance-claims-index-v2"
AZURE_SEARCH_API_KEY = "your-search-key"
# ๐Ÿ”น YugabyteDB Connection Configuration
YB_DB_HOST = "your-db-host"
YB_DB_PORT = "5433"
YB_DB_NAME = "yugabyte"
YB_DB_USER = "yugabyte"
YB_DB_PASSWORD = "your-password"
# ๐Ÿ”น Function to fetch data from Azure AI Search
def fetch_documents():
url = f"{AZURE_SEARCH_ENDPOINT}/indexes/{AZURE_SEARCH_INDEX}/docs?api-version=2023-07-01-Preview&search=*"
headers = {
"api-key": AZURE_SEARCH_API_KEY,
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
documents = response.json().get("value", [])
print(f"Documents fetched: {documents}") # Log the documents for inspection
return documents
else:
print(f"โŒ Error: {response.status_code} - {response.text}")
return []
# ๐Ÿ”น Function to parse the content and extract claim details
def extract_claim_details(content):
claim_details = {}
claim_details["claim_id"] = re.search(r"Claim ID:\s*([^\n]+)", content)
claim_details["policy_id"] = re.search(r"Policy ID:\s*([^\n]+)", content)
claim_details["claimant_name"] = re.search(r"Claimant Name:\s*([^\n]+)", content)
claim_details["claim_status"] = re.search(r"Claim Status:\s*([^\n]+)", content)
claim_details["claim_amount"] = re.search(r"Claim Amount:\s*\$?([0-9.,]+)", content)
claim_details["claim_date"] = re.search(r"Claim Date:\s*([^\n]+)", content)
claim_details["document_url"] = re.search(r"Document URL:\s*([^\n]+)", content)
claim_details["claim_description"] = re.search(r"Claim Description:\s*([^\n]+)", content)
# Extract and clean the data if the match was found, otherwise set as None
return {key: match.group(1) if match else None for key, match in claim_details.items()}
# ๐Ÿ”น Function to insert data into YugabyteDB
def insert_into_yugabyte(doc):
# Strip any leading or trailing whitespace from each field
doc = {key: value.strip() if isinstance(value, str) else value for key, value in doc.items()}
print(f"Inserting document: {doc}") # Log the document to verify the cleaned data
conn = None
try:
conn = psycopg2.connect(
dbname=YB_DB_NAME, user=YB_DB_USER, password=YB_DB_PASSWORD,
host=YB_DB_HOST, port=YB_DB_PORT
)
cursor = conn.cursor()
sql = """
INSERT INTO claims (
claim_id, policy_id, claimant_name, claim_status,
claim_amount, claim_description, claim_date, document_url
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (claim_id) DO UPDATE
SET policy_id = EXCLUDED.policy_id,
claimant_name = EXCLUDED.claimant_name,
claim_status = EXCLUDED.claim_status,
claim_amount = EXCLUDED.claim_amount,
claim_description = EXCLUDED.claim_description,
claim_date = EXCLUDED.claim_date,
document_url = EXCLUDED.document_url;
"""
cursor.execute(sql, (
doc["claim_id"], doc["policy_id"], doc["claimant_name"], doc["claim_status"],
doc["claim_amount"], doc.get("claim_description", ""), doc["claim_date"], doc["document_url"]
))
conn.commit()
print(f"โœ… Inserted: {doc['claim_id']}")
except Exception as e:
print(f"โŒ Database Error: {e}")
finally:
if conn:
conn.close()
# ๐Ÿ”น Main script to sync data
def sync_data():
documents = fetch_documents()
for doc in documents:
content = doc.get("content", "")
extracted_data = extract_claim_details(content)
insert_into_yugabyte(extracted_data)
# Run the sync process
if __name__ == "__main__":
sync_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment