Created
July 21, 2025 20:50
-
-
Save standardnguyen/1852015e046acc702ecc8990ef5ec26e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| DeepSeek API Concurrent Batch Processor - Fixed Type Errors | |
| This version fixes the type annotation errors you're seeing. | |
| """ | |
| import os | |
| import glob | |
| import time | |
| import asyncio | |
| from pathlib import Path | |
| from openai import AsyncOpenAI | |
| from dotenv import load_dotenv | |
| from typing import Optional, List, Dict, Any | |
| # Load environment variables | |
| load_dotenv() | |
| class DeepSeekConcurrentProcessor: | |
| def __init__(self): | |
| self.api_key = os.getenv('DEEPSEEK_API_KEY') | |
| if not self.api_key: | |
| raise ValueError("DEEPSEEK_API_KEY environment variable is required") | |
| # Initialize OpenAI client with DeepSeek settings | |
| self.client = AsyncOpenAI( | |
| api_key=self.api_key, | |
| base_url="https://api.deepseek.com" | |
| ) | |
| # Set up directories | |
| self.prompts_dir = Path("./prompts_for_raw_chapters/") | |
| self.responses_dir = Path("./responses_from_deepseek/") | |
| # Create responses directory if it doesn't exist | |
| self.responses_dir.mkdir(exist_ok=True) | |
| def read_prompt_file(self, filepath: str) -> Optional[str]: | |
| """Read the content of a prompt file""" | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| return f.read().strip() | |
| except Exception as e: | |
| print(f"Error reading file {filepath}: {e}") | |
| return None | |
| async def send_to_deepseek_async(self, prompt: str, model: str = "deepseek-chat") -> Optional[str]: | |
| """Send prompt to DeepSeek API asynchronously and return response""" | |
| try: | |
| # Create messages list with explicit typing | |
| messages: List[Dict[str, str]] = [ | |
| {"role": "user", "content": prompt} | |
| ] | |
| response = await self.client.chat.completions.create( | |
| model=model, | |
| messages=messages, # type: ignore | |
| max_tokens=8000, | |
| stream=False | |
| ) | |
| # Check if response was truncated due to max_tokens limit | |
| if response.choices[0].finish_reason == "length": | |
| print(f"β οΈ Warning: Response was truncated due to max_tokens limit") | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| print(f"API Error: {e}") | |
| return None | |
| def save_response(self, response_text: str, output_filename: str) -> bool: | |
| """Save response to file""" | |
| try: | |
| output_path = self.responses_dir / output_filename | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(response_text) | |
| print(f"β Saved response to {output_path}") | |
| return True | |
| except Exception as e: | |
| print(f"Error saving response to {output_filename}: {e}") | |
| return False | |
| def get_specific_prompt_files(self, chapter_numbers: List[int]) -> List[str]: | |
| """Get specific prompt files for given chapter numbers""" | |
| files = [] | |
| for chapter_num in chapter_numbers: | |
| # Format chapter number with leading zero if needed | |
| chapter_str = f"{chapter_num:02d}" | |
| pattern = str(self.prompts_dir / f"prompt_chapter_{chapter_str}.md") | |
| matching_files = glob.glob(pattern) | |
| if matching_files: | |
| files.extend(matching_files) | |
| else: | |
| print(f"Warning: No file found for chapter {chapter_num} (pattern: {pattern})") | |
| return sorted(files) | |
| async def process_single_file(self, filepath: str) -> bool: | |
| """Process a single file asynchronously""" | |
| filename = Path(filepath).name | |
| print(f"Starting processing: {filename}") | |
| # Read the prompt | |
| prompt = self.read_prompt_file(filepath) | |
| if prompt is None: | |
| print(f"β Skipped {filename} (read error)") | |
| return False | |
| # Send to DeepSeek | |
| response = await self.send_to_deepseek_async(prompt) | |
| if response is None: | |
| print(f"β Failed to get response for {filename}") | |
| return False | |
| # Generate output filename | |
| # Convert prompt_chapter_01.md -> response_chapter_01.md | |
| output_filename = filename.replace("prompt_", "response_") | |
| # Save response | |
| if self.save_response(response, output_filename): | |
| print(f"β Completed processing: {filename}") | |
| return True | |
| else: | |
| print(f"β Failed to save response for {filename}") | |
| return False | |
| async def process_files_concurrently(self, chapter_numbers: List[int]): | |
| """Process multiple files concurrently""" | |
| prompt_files = self.get_specific_prompt_files(chapter_numbers) | |
| if not prompt_files: | |
| print(f"No files found for chapters: {chapter_numbers}") | |
| return | |
| print(f"Found {len(prompt_files)} files to process concurrently:") | |
| for file in prompt_files: | |
| print(f" - {Path(file).name}") | |
| print(f"Responses will be saved to: {self.responses_dir}") | |
| print("-" * 50) | |
| # Process all files concurrently | |
| start_time = time.time() | |
| tasks = [self.process_single_file(filepath) for filepath in prompt_files] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Count results | |
| successful = sum(1 for result in results if result is True) | |
| failed = len(results) - successful | |
| elapsed_time = time.time() - start_time | |
| print("-" * 50) | |
| print(f"Concurrent processing complete!") | |
| print(f"β Successful: {successful}") | |
| print(f"β Failed: {failed}") | |
| print(f"β± Total time: {elapsed_time:.2f} seconds") | |
| print(f"β‘ Average time per file: {elapsed_time/len(results):.2f} seconds") | |
| def main(): | |
| """Main function""" | |
| try: | |
| processor = DeepSeekConcurrentProcessor() | |
| # Specify which chapters to process | |
| chapters_to_process = [3,6,8,9,10,11,12,*range(13, 30)] # Chapters 13-29 | |
| print(f"Processing chapters: {chapters_to_process}") | |
| # Run the async processing | |
| asyncio.run(processor.process_files_concurrently(chapters_to_process)) | |
| except KeyboardInterrupt: | |
| print("\nProcess interrupted by user") | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| DeepSeek Continuation Fixer | |
| This script identifies truncated responses and uses Chat Prefix Completion | |
| to continue them until they're complete. | |
| """ | |
| import os | |
| import glob | |
| import time | |
| import asyncio | |
| from pathlib import Path | |
| from openai import AsyncOpenAI | |
| from dotenv import load_dotenv | |
| import re | |
| # Load environment variables | |
| load_dotenv() | |
| class DeepSeekContinuationFixer: | |
| def __init__(self): | |
| self.api_key = os.getenv('DEEPSEEK_API_KEY') | |
| if not self.api_key: | |
| raise ValueError("DEEPSEEK_API_KEY environment variable is required") | |
| # Initialize OpenAI client with DeepSeek beta endpoint for continuation | |
| self.beta_client = AsyncOpenAI( | |
| api_key=self.api_key, | |
| base_url="https://api.deepseek.com/beta" # Beta endpoint for Chat Prefix Completion | |
| ) | |
| # Regular client for checking | |
| self.regular_client = AsyncOpenAI( | |
| api_key=self.api_key, | |
| base_url="https://api.deepseek.com" | |
| ) | |
| # Set up directories | |
| self.prompts_dir = Path("./prompts_for_raw_chapters/") | |
| self.responses_dir = Path("./responses_from_deepseek/") | |
| self.backup_dir = Path("./responses_from_deepseek/backups/") | |
| # Create backup directory | |
| self.backup_dir.mkdir(exist_ok=True) | |
| def read_file(self, filepath): | |
| """Read the content of a file""" | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| return f.read().strip() | |
| except Exception as e: | |
| print(f"Error reading file {filepath}: {e}") | |
| return None | |
| def save_file(self, content, filepath): | |
| """Save content to file""" | |
| try: | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| return True | |
| except Exception as e: | |
| print(f"Error saving file {filepath}: {e}") | |
| return False | |
| def is_response_truncated(self, response_text): | |
| """Check if a response appears to be truncated""" | |
| # Common signs of truncation | |
| truncation_indicators = [ | |
| # Ends mid-sentence | |
| lambda text: not text.strip().endswith(('.', '!', '?', '"', "'", ')', ']', '}')) and len(text.strip()) > 100, | |
| # Ends with incomplete word | |
| lambda text: text.strip() and text.strip()[-1].isalnum() and ' ' not in text.strip()[-20:], | |
| # Very abrupt ending after long content | |
| lambda text: len(text) > 5000 and not any(text.strip().endswith(end) for end in ['.', '!', '?', '."', ".'", '?"', "?'"]), | |
| # Ends with incomplete markdown | |
| lambda text: text.count('```') % 2 != 0 or text.count('**') % 2 != 0, | |
| ] | |
| return any(indicator(response_text) for indicator in truncation_indicators) | |
| async def continue_response(self, original_prompt, truncated_response, max_continuations=5): | |
| """Continue a truncated response using Chat Prefix Completion""" | |
| try: | |
| full_response = truncated_response | |
| continuation_count = 0 | |
| print(f" π Attempting to continue truncated response...") | |
| while continuation_count < max_continuations: | |
| # Use Chat Prefix Completion to continue | |
| continuation_response = await self.beta_client.chat.completions.create( | |
| model="deepseek-chat", | |
| messages=[ | |
| {"role": "user", "content": original_prompt}, | |
| {"role": "assistant", "content": full_response, "prefix": True} | |
| ], | |
| max_tokens=8000, | |
| stream=False | |
| ) | |
| continuation_text = continuation_response.choices[0].message.content | |
| if not continuation_text or len(continuation_text.strip()) < 10: | |
| print(f" β Response appears complete (short continuation)") | |
| break | |
| # Append the continuation | |
| full_response += continuation_text | |
| continuation_count += 1 | |
| print(f" π Added continuation {continuation_count} ({len(continuation_text)} chars)") | |
| # Check if this continuation was also truncated | |
| if continuation_response.choices[0].finish_reason != "length": | |
| print(f" β Response completed after {continuation_count} continuations") | |
| break | |
| # Small delay between continuations | |
| await asyncio.sleep(0.5) | |
| if continuation_count >= max_continuations: | |
| print(f" β οΈ Reached max continuations ({max_continuations}), response may still be incomplete") | |
| return full_response | |
| except Exception as e: | |
| print(f" β Error during continuation: {e}") | |
| return truncated_response # Return original if continuation fails | |
| def get_truncated_files(self): | |
| """Find all response files that appear to be truncated""" | |
| response_files = glob.glob(str(self.responses_dir / "response_chapter_*.md")) | |
| truncated_files = [] | |
| print("π Scanning for truncated responses...") | |
| for filepath in response_files: | |
| content = self.read_file(filepath) | |
| if content and self.is_response_truncated(content): | |
| truncated_files.append(filepath) | |
| print(f" π Found truncated: {Path(filepath).name}") | |
| return sorted(truncated_files) | |
| def get_original_prompt(self, response_filepath): | |
| """Get the original prompt for a response file""" | |
| response_filename = Path(response_filepath).name | |
| prompt_filename = response_filename.replace("response_", "prompt_") | |
| prompt_filepath = self.prompts_dir / prompt_filename | |
| return self.read_file(prompt_filepath) | |
| async def fix_single_file(self, response_filepath): | |
| """Fix a single truncated response file""" | |
| filename = Path(response_filepath).name | |
| print(f"\nπ§ Fixing: {filename}") | |
| # Read the truncated response | |
| truncated_response = self.read_file(response_filepath) | |
| if not truncated_response: | |
| print(f" β Could not read response file") | |
| return False | |
| # Get the original prompt | |
| original_prompt = self.get_original_prompt(response_filepath) | |
| if not original_prompt: | |
| print(f" β Could not find original prompt") | |
| return False | |
| # Create backup | |
| backup_filepath = self.backup_dir / f"{filename}.backup" | |
| if not self.save_file(truncated_response, backup_filepath): | |
| print(f" β οΈ Could not create backup") | |
| else: | |
| print(f" πΎ Backup saved to {backup_filepath}") | |
| # Continue the response | |
| complete_response = await self.continue_response(original_prompt, truncated_response) | |
| if len(complete_response) <= len(truncated_response): | |
| print(f" β οΈ No additional content generated") | |
| return False | |
| # Save the completed response | |
| if self.save_file(complete_response, response_filepath): | |
| added_length = len(complete_response) - len(truncated_response) | |
| print(f" β Fixed! Added {added_length} characters") | |
| return True | |
| else: | |
| print(f" β Failed to save completed response") | |
| return False | |
| async def fix_all_truncated(self): | |
| """Fix all truncated response files""" | |
| truncated_files = self.get_truncated_files() | |
| if not truncated_files: | |
| print("β No truncated files found!") | |
| return | |
| print(f"\nπ― Found {len(truncated_files)} truncated files to fix") | |
| print("-" * 50) | |
| start_time = time.time() | |
| successful = 0 | |
| failed = 0 | |
| for filepath in truncated_files: | |
| try: | |
| if await self.fix_single_file(filepath): | |
| successful += 1 | |
| else: | |
| failed += 1 | |
| except Exception as e: | |
| print(f" β Error fixing {Path(filepath).name}: {e}") | |
| failed += 1 | |
| # Small delay between files | |
| await asyncio.sleep(1) | |
| elapsed_time = time.time() - start_time | |
| print("-" * 50) | |
| print(f"π Completion fixing complete!") | |
| print(f"β Successfully fixed: {successful}") | |
| print(f"β Failed to fix: {failed}") | |
| print(f"β± Total time: {elapsed_time:.2f} seconds") | |
| async def fix_specific_chapters(self, chapter_numbers): | |
| """Fix specific chapter numbers that are truncated""" | |
| all_truncated = self.get_truncated_files() | |
| # Filter for specific chapters | |
| specific_truncated = [] | |
| for filepath in all_truncated: | |
| filename = Path(filepath).name | |
| # Extract chapter number from filename like "response_chapter_13.md" | |
| match = re.search(r'response_chapter_(\d+)\.md', filename) | |
| if match and int(match.group(1)) in chapter_numbers: | |
| specific_truncated.append(filepath) | |
| if not specific_truncated: | |
| print(f"β No truncated files found for chapters {chapter_numbers}!") | |
| return | |
| print(f"π― Found {len(specific_truncated)} truncated files for chapters {chapter_numbers}") | |
| for filepath in specific_truncated: | |
| print(f" π {Path(filepath).name}") | |
| print("-" * 50) | |
| start_time = time.time() | |
| successful = 0 | |
| failed = 0 | |
| for filepath in specific_truncated: | |
| try: | |
| if await self.fix_single_file(filepath): | |
| successful += 1 | |
| else: | |
| failed += 1 | |
| except Exception as e: | |
| print(f" β Error fixing {Path(filepath).name}: {e}") | |
| failed += 1 | |
| # Small delay between files | |
| await asyncio.sleep(1) | |
| elapsed_time = time.time() - start_time | |
| print("-" * 50) | |
| print(f"π Chapter fixing complete!") | |
| print(f"β Successfully fixed: {successful}") | |
| print(f"β Failed to fix: {failed}") | |
| print(f"β± Total time: {elapsed_time:.2f} seconds") | |
| def main(): | |
| """Main function""" | |
| try: | |
| fixer = DeepSeekContinuationFixer() | |
| print("DeepSeek Continuation Fixer") | |
| print("=" * 50) | |
| print("1. Fix all truncated responses") | |
| print("2. Fix specific chapters") | |
| choice = input("\nChoose option (1 or 2): ").strip() | |
| if choice == "1": | |
| asyncio.run(fixer.fix_all_truncated()) | |
| elif choice == "2": | |
| chapters_input = input("Enter chapter numbers (e.g., 10,13,15 or 13-29): ").strip() | |
| # Parse chapter numbers | |
| chapter_numbers = [] | |
| for part in chapters_input.split(','): | |
| part = part.strip() | |
| if '-' in part: | |
| start, end = part.split('-') | |
| chapter_numbers.extend(range(int(start), int(end) + 1)) | |
| else: | |
| chapter_numbers.append(int(part)) | |
| asyncio.run(fixer.fix_specific_chapters(chapter_numbers)) | |
| else: | |
| print("Invalid choice!") | |
| except KeyboardInterrupt: | |
| print("\nProcess interrupted by user") | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment