Created
May 22, 2024 22:08
-
-
Save nicholishen/eab625bda3ee632ac5f2d695182a9a79 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
import json | |
import asyncio | |
import aiofiles | |
import openai | |
from openai import AsyncOpenAI | |
import logging | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
# Paths and constants | |
EXTRACTION_DIR = Path(r"path\to\your\directory\with\text\files") | |
OUTPUT_FILE = EXTRACTION_DIR / "extracted_violations.json" | |
OPENAI_API_KEY = "your api key" # Update this with your actual OpenAI API key | |
# Initialize the OpenAI client | |
openai.api_key = OPENAI_API_KEY | |
client = AsyncOpenAI(api_key=OPENAI_API_KEY) | |
# Placeholder for the function schema | |
FUNC_SCHEMA = { | |
"name": "extract_violation_details", | |
"description": "Extracts violation details from HOA violation letters", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"homeowner_name": { | |
"type": "string", | |
"description": "The name of the homeowner(s) receiving the violation notice.", | |
}, | |
"property_address": { | |
"type": "string", | |
"description": "The address of the property with the reported violation.", | |
}, | |
"violation_type": { | |
"type": "string", | |
"description": "The type of violation reported in the notice (e.g., Roof – Dirty, Yard Maintenance).", | |
}, | |
"violation_date": { | |
"type": "string", | |
"format": "date", | |
"description": "The date the violation notice was issued.", | |
}, | |
"letter_type": { | |
"type": "string", | |
"description": "The type of letter issued (e.g., Friendly Reminder, Second Notice).", | |
}, | |
"compliance_action_required": { | |
"type": "string", | |
"description": "The specific action required by the homeowner to comply with the violation notice.", | |
}, | |
"community_document": { | |
"type": "string", | |
"description": "The community governing document referenced in the violation notice.", | |
}, | |
"document_section": { | |
"type": "string", | |
"description": "The specific section of the community governing document referenced.", | |
}, | |
"contact_person": { | |
"type": "string", | |
"description": "The name of the contact person at the homeowners association for addressing the violation.", | |
}, | |
"contact_phone": { | |
"type": "string", | |
"description": "The phone number of the contact person at the homeowners association.", | |
}, | |
"contact_email": { | |
"type": "string", | |
"description": "The email address of the contact person at the homeowners association.", | |
}, | |
"violation_status": { | |
"type": "string", | |
"description": "The current status of the violation (e.g., Pending, Resolved).", | |
}, | |
}, | |
"required": [ | |
"homeowner_name", | |
"property_address", | |
"violation_type", | |
"violation_date", | |
], | |
}, | |
} | |
# Call OpenAI API to extract details | |
async def call_openai_api_to_extract_details(text, semaphore): | |
async with semaphore: | |
try: | |
completion = await client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "system", | |
"content": "Extracts violation details from HOA violation letters.", | |
}, | |
{"role": "user", "content": text}, | |
], | |
functions=[FUNC_SCHEMA], | |
function_call="auto", | |
max_tokens=1500, | |
) | |
extracted_details = completion.choices[0].message.function_call.arguments | |
return json.loads(extracted_details) | |
except Exception as e: | |
logging.error(f"Error calling OpenAI API: {e}") | |
return None | |
# Process a single file | |
async def process_file(text_file_path, semaphore): | |
async with aiofiles.open(text_file_path, "r") as text_file: | |
text = await text_file.read() | |
extracted_data = await call_openai_api_to_extract_details(text, semaphore) | |
if extracted_data is not None: | |
logging.info(f"Processed file: {text_file_path}") | |
else: | |
logging.error(f"Failed to process file: {text_file_path}") | |
return ( | |
{"file_name": text_file_path.name, **extracted_data} if extracted_data else None | |
) | |
# Main processing function | |
async def main(): | |
extracted_details = [] | |
text_files = list(EXTRACTION_DIR.rglob("*.txt")) | |
semaphore = asyncio.Semaphore(30) # Limit to 30 concurrent requests | |
logging.info("Starting asynchronous processing of text files") | |
tasks = [process_file(text_file_path, semaphore) for text_file_path in text_files] | |
for task in asyncio.as_completed(tasks): | |
data = await task | |
if data is not None: | |
extracted_details.append(data) | |
logging.info(f"Extracted details from {data['file_name']}") | |
async with aiofiles.open(OUTPUT_FILE, "w", encoding="utf-8") as f: | |
await f.write(json.dumps(extracted_details, indent=4)) | |
logging.info(f"Extraction and saving of JSON file completed: {OUTPUT_FILE}") | |
async with aiofiles.open(OUTPUT_FILE, "r", encoding="utf-8") as f: | |
loaded_data = json.loads(await f.read()) | |
logging.info(f"Loaded data for verification: {loaded_data}") | |
# Run the main function | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment