Skip to content

Instantly share code, notes, and snippets.

@nicholishen
Created May 22, 2024 22:08
Show Gist options
  • Save nicholishen/eab625bda3ee632ac5f2d695182a9a79 to your computer and use it in GitHub Desktop.
Save nicholishen/eab625bda3ee632ac5f2d695182a9a79 to your computer and use it in GitHub Desktop.
from pathlib import Path
import json
import asyncio
import aiofiles
import openai
from openai import AsyncOpenAI
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# Paths and constants
EXTRACTION_DIR = Path(r"path\to\your\directory\with\text\files")
OUTPUT_FILE = EXTRACTION_DIR / "extracted_violations.json"
OPENAI_API_KEY = "your api key" # Update this with your actual OpenAI API key
# Initialize the OpenAI client
openai.api_key = OPENAI_API_KEY
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
# Placeholder for the function schema
FUNC_SCHEMA = {
"name": "extract_violation_details",
"description": "Extracts violation details from HOA violation letters",
"parameters": {
"type": "object",
"properties": {
"homeowner_name": {
"type": "string",
"description": "The name of the homeowner(s) receiving the violation notice.",
},
"property_address": {
"type": "string",
"description": "The address of the property with the reported violation.",
},
"violation_type": {
"type": "string",
"description": "The type of violation reported in the notice (e.g., Roof – Dirty, Yard Maintenance).",
},
"violation_date": {
"type": "string",
"format": "date",
"description": "The date the violation notice was issued.",
},
"letter_type": {
"type": "string",
"description": "The type of letter issued (e.g., Friendly Reminder, Second Notice).",
},
"compliance_action_required": {
"type": "string",
"description": "The specific action required by the homeowner to comply with the violation notice.",
},
"community_document": {
"type": "string",
"description": "The community governing document referenced in the violation notice.",
},
"document_section": {
"type": "string",
"description": "The specific section of the community governing document referenced.",
},
"contact_person": {
"type": "string",
"description": "The name of the contact person at the homeowners association for addressing the violation.",
},
"contact_phone": {
"type": "string",
"description": "The phone number of the contact person at the homeowners association.",
},
"contact_email": {
"type": "string",
"description": "The email address of the contact person at the homeowners association.",
},
"violation_status": {
"type": "string",
"description": "The current status of the violation (e.g., Pending, Resolved).",
},
},
"required": [
"homeowner_name",
"property_address",
"violation_type",
"violation_date",
],
},
}
# Call OpenAI API to extract details
async def call_openai_api_to_extract_details(text, semaphore):
async with semaphore:
try:
completion = await client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Extracts violation details from HOA violation letters.",
},
{"role": "user", "content": text},
],
functions=[FUNC_SCHEMA],
function_call="auto",
max_tokens=1500,
)
extracted_details = completion.choices[0].message.function_call.arguments
return json.loads(extracted_details)
except Exception as e:
logging.error(f"Error calling OpenAI API: {e}")
return None
# Process a single file
async def process_file(text_file_path, semaphore):
async with aiofiles.open(text_file_path, "r") as text_file:
text = await text_file.read()
extracted_data = await call_openai_api_to_extract_details(text, semaphore)
if extracted_data is not None:
logging.info(f"Processed file: {text_file_path}")
else:
logging.error(f"Failed to process file: {text_file_path}")
return (
{"file_name": text_file_path.name, **extracted_data} if extracted_data else None
)
# Main processing function
async def main():
extracted_details = []
text_files = list(EXTRACTION_DIR.rglob("*.txt"))
semaphore = asyncio.Semaphore(30) # Limit to 30 concurrent requests
logging.info("Starting asynchronous processing of text files")
tasks = [process_file(text_file_path, semaphore) for text_file_path in text_files]
for task in asyncio.as_completed(tasks):
data = await task
if data is not None:
extracted_details.append(data)
logging.info(f"Extracted details from {data['file_name']}")
async with aiofiles.open(OUTPUT_FILE, "w", encoding="utf-8") as f:
await f.write(json.dumps(extracted_details, indent=4))
logging.info(f"Extraction and saving of JSON file completed: {OUTPUT_FILE}")
async with aiofiles.open(OUTPUT_FILE, "r", encoding="utf-8") as f:
loaded_data = json.loads(await f.read())
logging.info(f"Loaded data for verification: {loaded_data}")
# Run the main function
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment