Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save standardnguyen/1852015e046acc702ecc8990ef5ec26e to your computer and use it in GitHub Desktop.

Select an option

Save standardnguyen/1852015e046acc702ecc8990ef5ec26e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
DeepSeek API Concurrent Batch Processor - Fixed Type Errors
This version fixes the type annotation errors you're seeing.
"""
import os
import glob
import time
import asyncio
from pathlib import Path
from openai import AsyncOpenAI
from dotenv import load_dotenv
from typing import Optional, List, Dict, Any
# Load environment variables
load_dotenv()
class DeepSeekConcurrentProcessor:
def __init__(self):
self.api_key = os.getenv('DEEPSEEK_API_KEY')
if not self.api_key:
raise ValueError("DEEPSEEK_API_KEY environment variable is required")
# Initialize OpenAI client with DeepSeek settings
self.client = AsyncOpenAI(
api_key=self.api_key,
base_url="https://api.deepseek.com"
)
# Set up directories
self.prompts_dir = Path("./prompts_for_raw_chapters/")
self.responses_dir = Path("./responses_from_deepseek/")
# Create responses directory if it doesn't exist
self.responses_dir.mkdir(exist_ok=True)
def read_prompt_file(self, filepath: str) -> Optional[str]:
"""Read the content of a prompt file"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return f.read().strip()
except Exception as e:
print(f"Error reading file {filepath}: {e}")
return None
async def send_to_deepseek_async(self, prompt: str, model: str = "deepseek-chat") -> Optional[str]:
"""Send prompt to DeepSeek API asynchronously and return response"""
try:
# Create messages list with explicit typing
messages: List[Dict[str, str]] = [
{"role": "user", "content": prompt}
]
response = await self.client.chat.completions.create(
model=model,
messages=messages, # type: ignore
max_tokens=8000,
stream=False
)
# Check if response was truncated due to max_tokens limit
if response.choices[0].finish_reason == "length":
print(f"⚠️ Warning: Response was truncated due to max_tokens limit")
return response.choices[0].message.content
except Exception as e:
print(f"API Error: {e}")
return None
def save_response(self, response_text: str, output_filename: str) -> bool:
"""Save response to file"""
try:
output_path = self.responses_dir / output_filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write(response_text)
print(f"βœ“ Saved response to {output_path}")
return True
except Exception as e:
print(f"Error saving response to {output_filename}: {e}")
return False
def get_specific_prompt_files(self, chapter_numbers: List[int]) -> List[str]:
"""Get specific prompt files for given chapter numbers"""
files = []
for chapter_num in chapter_numbers:
# Format chapter number with leading zero if needed
chapter_str = f"{chapter_num:02d}"
pattern = str(self.prompts_dir / f"prompt_chapter_{chapter_str}.md")
matching_files = glob.glob(pattern)
if matching_files:
files.extend(matching_files)
else:
print(f"Warning: No file found for chapter {chapter_num} (pattern: {pattern})")
return sorted(files)
async def process_single_file(self, filepath: str) -> bool:
"""Process a single file asynchronously"""
filename = Path(filepath).name
print(f"Starting processing: {filename}")
# Read the prompt
prompt = self.read_prompt_file(filepath)
if prompt is None:
print(f"βœ— Skipped {filename} (read error)")
return False
# Send to DeepSeek
response = await self.send_to_deepseek_async(prompt)
if response is None:
print(f"βœ— Failed to get response for {filename}")
return False
# Generate output filename
# Convert prompt_chapter_01.md -> response_chapter_01.md
output_filename = filename.replace("prompt_", "response_")
# Save response
if self.save_response(response, output_filename):
print(f"βœ“ Completed processing: {filename}")
return True
else:
print(f"βœ— Failed to save response for {filename}")
return False
async def process_files_concurrently(self, chapter_numbers: List[int]):
"""Process multiple files concurrently"""
prompt_files = self.get_specific_prompt_files(chapter_numbers)
if not prompt_files:
print(f"No files found for chapters: {chapter_numbers}")
return
print(f"Found {len(prompt_files)} files to process concurrently:")
for file in prompt_files:
print(f" - {Path(file).name}")
print(f"Responses will be saved to: {self.responses_dir}")
print("-" * 50)
# Process all files concurrently
start_time = time.time()
tasks = [self.process_single_file(filepath) for filepath in prompt_files]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count results
successful = sum(1 for result in results if result is True)
failed = len(results) - successful
elapsed_time = time.time() - start_time
print("-" * 50)
print(f"Concurrent processing complete!")
print(f"βœ“ Successful: {successful}")
print(f"βœ— Failed: {failed}")
print(f"⏱ Total time: {elapsed_time:.2f} seconds")
print(f"⚑ Average time per file: {elapsed_time/len(results):.2f} seconds")
def main():
"""Main function"""
try:
processor = DeepSeekConcurrentProcessor()
# Specify which chapters to process
chapters_to_process = [3,6,8,9,10,11,12,*range(13, 30)] # Chapters 13-29
print(f"Processing chapters: {chapters_to_process}")
# Run the async processing
asyncio.run(processor.process_files_concurrently(chapters_to_process))
except KeyboardInterrupt:
print("\nProcess interrupted by user")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
DeepSeek Continuation Fixer
This script identifies truncated responses and uses Chat Prefix Completion
to continue them until they're complete.
"""
import os
import glob
import time
import asyncio
from pathlib import Path
from openai import AsyncOpenAI
from dotenv import load_dotenv
import re
# Load environment variables
load_dotenv()
class DeepSeekContinuationFixer:
def __init__(self):
self.api_key = os.getenv('DEEPSEEK_API_KEY')
if not self.api_key:
raise ValueError("DEEPSEEK_API_KEY environment variable is required")
# Initialize OpenAI client with DeepSeek beta endpoint for continuation
self.beta_client = AsyncOpenAI(
api_key=self.api_key,
base_url="https://api.deepseek.com/beta" # Beta endpoint for Chat Prefix Completion
)
# Regular client for checking
self.regular_client = AsyncOpenAI(
api_key=self.api_key,
base_url="https://api.deepseek.com"
)
# Set up directories
self.prompts_dir = Path("./prompts_for_raw_chapters/")
self.responses_dir = Path("./responses_from_deepseek/")
self.backup_dir = Path("./responses_from_deepseek/backups/")
# Create backup directory
self.backup_dir.mkdir(exist_ok=True)
def read_file(self, filepath):
"""Read the content of a file"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return f.read().strip()
except Exception as e:
print(f"Error reading file {filepath}: {e}")
return None
def save_file(self, content, filepath):
"""Save content to file"""
try:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return True
except Exception as e:
print(f"Error saving file {filepath}: {e}")
return False
def is_response_truncated(self, response_text):
"""Check if a response appears to be truncated"""
# Common signs of truncation
truncation_indicators = [
# Ends mid-sentence
lambda text: not text.strip().endswith(('.', '!', '?', '"', "'", ')', ']', '}')) and len(text.strip()) > 100,
# Ends with incomplete word
lambda text: text.strip() and text.strip()[-1].isalnum() and ' ' not in text.strip()[-20:],
# Very abrupt ending after long content
lambda text: len(text) > 5000 and not any(text.strip().endswith(end) for end in ['.', '!', '?', '."', ".'", '?"', "?'"]),
# Ends with incomplete markdown
lambda text: text.count('```') % 2 != 0 or text.count('**') % 2 != 0,
]
return any(indicator(response_text) for indicator in truncation_indicators)
async def continue_response(self, original_prompt, truncated_response, max_continuations=5):
"""Continue a truncated response using Chat Prefix Completion"""
try:
full_response = truncated_response
continuation_count = 0
print(f" πŸ”„ Attempting to continue truncated response...")
while continuation_count < max_continuations:
# Use Chat Prefix Completion to continue
continuation_response = await self.beta_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "user", "content": original_prompt},
{"role": "assistant", "content": full_response, "prefix": True}
],
max_tokens=8000,
stream=False
)
continuation_text = continuation_response.choices[0].message.content
if not continuation_text or len(continuation_text.strip()) < 10:
print(f" βœ“ Response appears complete (short continuation)")
break
# Append the continuation
full_response += continuation_text
continuation_count += 1
print(f" πŸ“„ Added continuation {continuation_count} ({len(continuation_text)} chars)")
# Check if this continuation was also truncated
if continuation_response.choices[0].finish_reason != "length":
print(f" βœ“ Response completed after {continuation_count} continuations")
break
# Small delay between continuations
await asyncio.sleep(0.5)
if continuation_count >= max_continuations:
print(f" ⚠️ Reached max continuations ({max_continuations}), response may still be incomplete")
return full_response
except Exception as e:
print(f" βœ— Error during continuation: {e}")
return truncated_response # Return original if continuation fails
def get_truncated_files(self):
"""Find all response files that appear to be truncated"""
response_files = glob.glob(str(self.responses_dir / "response_chapter_*.md"))
truncated_files = []
print("πŸ” Scanning for truncated responses...")
for filepath in response_files:
content = self.read_file(filepath)
if content and self.is_response_truncated(content):
truncated_files.append(filepath)
print(f" πŸ“„ Found truncated: {Path(filepath).name}")
return sorted(truncated_files)
def get_original_prompt(self, response_filepath):
"""Get the original prompt for a response file"""
response_filename = Path(response_filepath).name
prompt_filename = response_filename.replace("response_", "prompt_")
prompt_filepath = self.prompts_dir / prompt_filename
return self.read_file(prompt_filepath)
async def fix_single_file(self, response_filepath):
"""Fix a single truncated response file"""
filename = Path(response_filepath).name
print(f"\nπŸ”§ Fixing: {filename}")
# Read the truncated response
truncated_response = self.read_file(response_filepath)
if not truncated_response:
print(f" βœ— Could not read response file")
return False
# Get the original prompt
original_prompt = self.get_original_prompt(response_filepath)
if not original_prompt:
print(f" βœ— Could not find original prompt")
return False
# Create backup
backup_filepath = self.backup_dir / f"{filename}.backup"
if not self.save_file(truncated_response, backup_filepath):
print(f" ⚠️ Could not create backup")
else:
print(f" πŸ’Ύ Backup saved to {backup_filepath}")
# Continue the response
complete_response = await self.continue_response(original_prompt, truncated_response)
if len(complete_response) <= len(truncated_response):
print(f" ⚠️ No additional content generated")
return False
# Save the completed response
if self.save_file(complete_response, response_filepath):
added_length = len(complete_response) - len(truncated_response)
print(f" βœ“ Fixed! Added {added_length} characters")
return True
else:
print(f" βœ— Failed to save completed response")
return False
async def fix_all_truncated(self):
"""Fix all truncated response files"""
truncated_files = self.get_truncated_files()
if not truncated_files:
print("βœ… No truncated files found!")
return
print(f"\n🎯 Found {len(truncated_files)} truncated files to fix")
print("-" * 50)
start_time = time.time()
successful = 0
failed = 0
for filepath in truncated_files:
try:
if await self.fix_single_file(filepath):
successful += 1
else:
failed += 1
except Exception as e:
print(f" βœ— Error fixing {Path(filepath).name}: {e}")
failed += 1
# Small delay between files
await asyncio.sleep(1)
elapsed_time = time.time() - start_time
print("-" * 50)
print(f"🏁 Completion fixing complete!")
print(f"βœ“ Successfully fixed: {successful}")
print(f"βœ— Failed to fix: {failed}")
print(f"⏱ Total time: {elapsed_time:.2f} seconds")
async def fix_specific_chapters(self, chapter_numbers):
"""Fix specific chapter numbers that are truncated"""
all_truncated = self.get_truncated_files()
# Filter for specific chapters
specific_truncated = []
for filepath in all_truncated:
filename = Path(filepath).name
# Extract chapter number from filename like "response_chapter_13.md"
match = re.search(r'response_chapter_(\d+)\.md', filename)
if match and int(match.group(1)) in chapter_numbers:
specific_truncated.append(filepath)
if not specific_truncated:
print(f"βœ… No truncated files found for chapters {chapter_numbers}!")
return
print(f"🎯 Found {len(specific_truncated)} truncated files for chapters {chapter_numbers}")
for filepath in specific_truncated:
print(f" πŸ“„ {Path(filepath).name}")
print("-" * 50)
start_time = time.time()
successful = 0
failed = 0
for filepath in specific_truncated:
try:
if await self.fix_single_file(filepath):
successful += 1
else:
failed += 1
except Exception as e:
print(f" βœ— Error fixing {Path(filepath).name}: {e}")
failed += 1
# Small delay between files
await asyncio.sleep(1)
elapsed_time = time.time() - start_time
print("-" * 50)
print(f"🏁 Chapter fixing complete!")
print(f"βœ“ Successfully fixed: {successful}")
print(f"βœ— Failed to fix: {failed}")
print(f"⏱ Total time: {elapsed_time:.2f} seconds")
def main():
"""Main function"""
try:
fixer = DeepSeekContinuationFixer()
print("DeepSeek Continuation Fixer")
print("=" * 50)
print("1. Fix all truncated responses")
print("2. Fix specific chapters")
choice = input("\nChoose option (1 or 2): ").strip()
if choice == "1":
asyncio.run(fixer.fix_all_truncated())
elif choice == "2":
chapters_input = input("Enter chapter numbers (e.g., 10,13,15 or 13-29): ").strip()
# Parse chapter numbers
chapter_numbers = []
for part in chapters_input.split(','):
part = part.strip()
if '-' in part:
start, end = part.split('-')
chapter_numbers.extend(range(int(start), int(end) + 1))
else:
chapter_numbers.append(int(part))
asyncio.run(fixer.fix_specific_chapters(chapter_numbers))
else:
print("Invalid choice!")
except KeyboardInterrupt:
print("\nProcess interrupted by user")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment