Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save standardnguyen/7dfebabe515b55d1faf45b0989cddd26 to your computer and use it in GitHub Desktop.

Select an option

Save standardnguyen/7dfebabe515b55d1faf45b0989cddd26 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
DeepSeek API Concurrent Batch Processor
This script reads markdown files from ./prompts_for_raw_chapters/
sends them to DeepSeek API concurrently, and saves responses to ./responses_from_deepseek/
Requirements:
- pip install openai python-dotenv
- Create a .env file with: DEEPSEEK_API_KEY=your_api_key_here
"""
import os
import glob
import time
import asyncio
from pathlib import Path
from openai import AsyncOpenAI
from dotenv import load_dotenv
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# Load environment variables
load_dotenv()
class DeepSeekConcurrentProcessor:
def __init__(self, enable_continuations=True, max_continuations=5):
self.api_key = os.getenv('DEEPSEEK_API_KEY')
if not self.api_key:
raise ValueError("DEEPSEEK_API_KEY environment variable is required")
self.enable_continuations = enable_continuations
self.max_continuations = max_continuations
# Initialize OpenAI client with DeepSeek settings
self.client = AsyncOpenAI(
api_key=self.api_key,
base_url="https://api.deepseek.com"
)
# Beta client for continuations
self.beta_client = AsyncOpenAI(
api_key=self.api_key,
base_url="https://api.deepseek.com/beta"
) if enable_continuations else None
# Set up directories
self.prompts_dir = Path("./prompts_for_raw_chapters/")
self.responses_dir = Path("./responses_from_deepseek/")
# Create responses directory if it doesn't exist
self.responses_dir.mkdir(exist_ok=True)
def read_prompt_file(self, filepath):
"""Read the content of a prompt file"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return f.read().strip()
except Exception as e:
print(f"Error reading file {filepath}: {e}")
return None
async def send_to_deepseek_async(self, prompt: str, model: str = "deepseek-chat"):
"""Send prompt to DeepSeek API asynchronously and return response"""
try:
response = await self.client.chat.completions.create(
model=model,
messages=[
{"role": "user", "content": prompt}
],
max_tokens=8000, # Increase from default 4096 to maximum 8000
stream=False
)
# Check if response was truncated due to max_tokens limit
if response.choices[0].finish_reason == "length":
print(f"⚠️ Warning: Response was truncated due to max_tokens limit")
# You could implement continuation logic here if needed
return response.choices[0].message.content
except Exception as e:
print(f"API Error: {e}")
return None
def save_response(self, response_text, output_filename):
"""Save response to file"""
try:
output_path = self.responses_dir / output_filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write(response_text)
print(f"✓ Saved response to {output_path}")
return True
except Exception as e:
print(f"Error saving response to {output_filename}: {e}")
return False
def get_specific_prompt_files(self, chapter_numbers):
"""Get specific prompt files for given chapter numbers"""
files = []
for chapter_num in chapter_numbers:
# Format chapter number with leading zero if needed
chapter_str = f"{chapter_num:02d}"
pattern = str(self.prompts_dir / f"prompt_chapter_{chapter_str}.md")
matching_files = glob.glob(pattern)
if matching_files:
files.extend(matching_files)
else:
print(f"Warning: No file found for chapter {chapter_num} (pattern: {pattern})")
return sorted(files)
async def process_single_file(self, filepath):
"""Process a single file asynchronously"""
filename = Path(filepath).name
print(f"Starting processing: {filename}")
# Read the prompt
prompt = self.read_prompt_file(filepath)
if prompt is None:
print(f"✗ Skipped {filename} (read error)")
return False
# Send to DeepSeek
response = await self.send_to_deepseek_async(prompt)
if response is None:
print(f"✗ Failed to get response for {filename}")
return False
# Generate output filename
# Convert prompt_chapter_01.md -> response_chapter_01.md
output_filename = filename.replace("prompt_", "response_")
# Save response
if self.save_response(response, output_filename):
print(f"✓ Completed processing: {filename}")
return True
else:
print(f"✗ Failed to save response for {filename}")
return False
async def process_files_concurrently(self, chapter_numbers):
"""Process multiple files concurrently"""
prompt_files = self.get_specific_prompt_files(chapter_numbers)
if not prompt_files:
print(f"No files found for chapters: {chapter_numbers}")
return
print(f"Found {len(prompt_files)} files to process concurrently:")
for file in prompt_files:
print(f" - {Path(file).name}")
print(f"Responses will be saved to: {self.responses_dir}")
print("-" * 50)
# Process all files concurrently
start_time = time.time()
tasks = [self.process_single_file(filepath) for filepath in prompt_files]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count results
successful = sum(1 for result in results if result is True)
failed = len(results) - successful
elapsed_time = time.time() - start_time
print("-" * 50)
print(f"Concurrent processing complete!")
print(f"✓ Successful: {successful}")
print(f"✗ Failed: {failed}")
print(f"⏱ Total time: {elapsed_time:.2f} seconds")
print(f"⚡ Average time per file: {elapsed_time/len(results):.2f} seconds")
def main():
"""Main function"""
try:
processor = DeepSeekConcurrentProcessor()
# Specify which chapters to process
chapters_to_process = [6, 8, 9, 10, 11, 12]
print(f"Processing chapters: {chapters_to_process}")
# Run the async processing
asyncio.run(processor.process_files_concurrently(chapters_to_process))
except KeyboardInterrupt:
print("\nProcess interrupted by user")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment