-
-
Save TheAhmadOsman/fbfdd7ea2dcf269977f26f764599b3df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import json | |
| import logging | |
| import re | |
| import unicodedata | |
| from pathlib import Path | |
| from typing import Optional, Tuple | |
| import lmstudio | |
| # --- Configuration --- | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| # Regex to find a date/time pattern like YYYY-MM-DD_HH-MM-SS or YYYYMMDD_HHMMSS etc. | |
| # Made slightly more flexible (optional T, space, hyphen separators) | |
| DATE_PATTERN = re.compile( | |
| r"(\d{4}[-_]?\d{2}[-_]?\d{2}[_T -]?\d{2}[-_:]?\d{2}[-_:]?\d{2})" | |
| ) | |
| ALLOWED_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif"} | |
| DEFAULT_CATEGORY = "uncategorized" | |
| MAX_SLUG_LENGTH = 60 | |
| MAX_CATEGORY_LENGTH = 30 | |
| # --- Helper Functions --- | |
| def extract_date_string(filename: str) -> Optional[str]: | |
| """Extracts the first matching date/time string from the filename.""" | |
| match = DATE_PATTERN.search(filename) | |
| if match: | |
| # Return the captured group, which is the date string itself | |
| # Clean it slightly for consistency (replace various separators with hyphens) | |
| date_part = match.group(1) | |
| date_part = re.sub(r"[_T :]", "-", date_part) # Replace _ T space : with - | |
| date_part = re.sub(r"-+", "-", date_part) # Collapse multiple hyphens | |
| return date_part | |
| return None | |
| def sanitize_filename_part( | |
| text: str, max_length: int, is_category: bool = False | |
| ) -> str: | |
| """ | |
| Cleans text to be suitable for a filename slug or directory name. | |
| - Converts to lowercase. | |
| - Replaces spaces and common separators with hyphens. | |
| - Removes characters not suitable for filenames/directories. | |
| - Truncates to max_length. | |
| - Removes leading/trailing hyphens. | |
| - Returns a default if the result is empty. | |
| """ | |
| default_name = DEFAULT_CATEGORY if is_category else "untitled" | |
| if not text: | |
| return default_name | |
| try: | |
| # Normalize unicode characters -> trying NFKD first | |
| text = ( | |
| unicodedata.normalize("NFKD", text) | |
| .encode("ascii", "ignore") | |
| .decode("ascii") | |
| ) | |
| except Exception: | |
| # Fallback if NFKD fails | |
| try: | |
| text = ( | |
| unicodedata.normalize("NFC", text) | |
| .encode("ascii", "ignore") | |
| .decode("ascii") | |
| ) | |
| except Exception: | |
| # Final fallback: aggressive removal of non-ASCII | |
| logging.warning( | |
| f"Could not normalize text: {text[:30]}... Using aggressive removal." | |
| ) | |
| text = re.sub(r"[^\x00-\x7F]+", "", text) # Remove non-ASCII | |
| text = text.lower() | |
| # Replace common separators and whitespace with hyphens more broadly | |
| text = re.sub(r"[\s_.:;,!?\"'()[\]{}|\\/@#$%\^&*+=\~`]+", "-", text) | |
| # Keep only alphanumeric characters and hyphens | |
| text = re.sub(r"[^a-z0-9-]", "", text) | |
| # Collapse multiple hyphens into one | |
| text = re.sub(r"-{2,}", "-", text) | |
| # Truncate *after* collapsing hyphens | |
| text = text[:max_length] | |
| # Remove leading/trailing hyphens *after* truncation | |
| text = text.strip("-") | |
| # If the process results in an empty string | |
| if not text: | |
| return default_name | |
| return text | |
| def parse_llm_response(response_content: str) -> Tuple[str, str]: | |
| """ | |
| Parses the LLM response, expecting JSON with 'slug' and 'category'. | |
| Provides robust fallbacks if parsing fails or keys are missing. | |
| Returns: | |
| Tuple[str, str]: (slug_text, category_text) | |
| """ | |
| raw_slug_text = "" | |
| raw_category_text = DEFAULT_CATEGORY # Start with default | |
| # First, try to clean potential markdown code blocks if present | |
| cleaned_content = response_content.strip() | |
| if cleaned_content.startswith("```json"): | |
| cleaned_content = cleaned_content[7:] | |
| elif cleaned_content.startswith("```"): | |
| cleaned_content = cleaned_content[3:] | |
| if cleaned_content.endswith("```"): | |
| cleaned_content = cleaned_content[:-3] | |
| cleaned_content = cleaned_content.strip() # Remove surrounding whitespace again | |
| try: | |
| # Attempt to parse the potentially cleaned content | |
| data = json.loads(cleaned_content) | |
| if isinstance(data, dict): | |
| raw_slug_text = data.get("slug", "").strip() | |
| # Get category, default to default if key missing or value is empty/whitespace | |
| category_from_json = data.get("category", DEFAULT_CATEGORY).strip() | |
| raw_category_text = ( | |
| category_from_json if category_from_json else DEFAULT_CATEGORY | |
| ) | |
| logging.debug( | |
| f"Successfully parsed JSON: slug='{raw_slug_text}', category='{raw_category_text}'" | |
| ) | |
| else: | |
| logging.warning( | |
| f"LLM response was valid JSON but not a dictionary: {response_content}" | |
| ) | |
| # Attempt to extract slug anyway if it's just a string | |
| if isinstance(data, str): | |
| raw_slug_text = data.strip() | |
| # Category remains default | |
| except json.JSONDecodeError: | |
| logging.warning( | |
| f"LLM response was not valid JSON. Raw response: '{response_content}'" | |
| ) | |
| # Fallback: Treat the entire response as a potential slug, use default category. | |
| # Basic cleaning for potential slug usage. | |
| raw_slug_text = response_content.strip().replace("\n", "-").replace("\r", "") | |
| raw_category_text = DEFAULT_CATEGORY | |
| except Exception as e: | |
| logging.error( | |
| f"Unexpected error parsing LLM response: {e}. Raw response: '{response_content}'" | |
| ) | |
| # Fallback to defaults in case of unexpected errors | |
| raw_slug_text = "" | |
| raw_category_text = DEFAULT_CATEGORY | |
| # Final check: Ensure category isn't empty after potential stripping or JSON issues | |
| if not raw_category_text: | |
| raw_category_text = DEFAULT_CATEGORY | |
| logging.debug("Category was empty after parsing, set to default.") | |
| # Final check: If slug is still empty after parsing/fallback, create a default one. | |
| if not raw_slug_text: | |
| # Use category (even if default) to make the default slug slightly more informative | |
| sanitized_category_for_slug = sanitize_filename_part( | |
| raw_category_text, MAX_CATEGORY_LENGTH, is_category=True | |
| ) | |
| raw_slug_text = f"{sanitized_category_for_slug}-screenshot" | |
| logging.debug( | |
| f"Slug was empty after parsing, set to default derived from category: '{raw_slug_text}'" | |
| ) | |
| # Return the raw extracted/defaulted text for further sanitization | |
| return raw_slug_text, raw_category_text | |
| # --- Main Functions --- | |
| def process_screenshot( | |
| screenshot_path: Path, | |
| llm: lmstudio.LLM, | |
| client: lmstudio.Client, | |
| dry_run: bool = False, | |
| ) -> Tuple[bool, str]: | |
| """ | |
| Processes a single screenshot: gets description and category, generates new name, | |
| creates category directory, and moves the file. | |
| Returns: | |
| Tuple[bool, str]: (success_status, message) | |
| """ | |
| if not screenshot_path.is_file(): | |
| return False, f"Skipped: '{screenshot_path.name}' is not a file." | |
| original_filename = screenshot_path.name | |
| original_filename_stem = screenshot_path.stem | |
| original_extension = screenshot_path.suffix.lower() | |
| if original_extension not in ALLOWED_EXTENSIONS: | |
| return ( | |
| False, | |
| f"Skipped: Non-image extension '{original_extension}' for '{original_filename}'", | |
| ) | |
| date_str = extract_date_string(original_filename_stem) | |
| if not date_str: | |
| # Optionally, allow processing files without dates? For now, skip. | |
| # return False, f"Skipped: No date/time found in filename '{original_filename}'" | |
| date_str = "" | |
| logging.info(f"Processing '{original_filename}'...") | |
| try: | |
| # 1. Prepare image for the model | |
| logging.debug(f"Preparing image '{screenshot_path}'...") | |
| # Use LMStudio method for handling images | |
| image_handle = client.prepare_image(screenshot_path) | |
| # 2. Create chat history for the model | |
| # --- Enhanced Prompt --- | |
| system_prompt = ( | |
| "You are a highly capable agent specializing in screenshot analysis, categorization, and precise file naming. " | |
| "Your task is to analyze the content of the screenshot and generate a JSON object containing:\n" | |
| f'1. "slug": A concise, lowercase, filesystem-safe slug (use hyphens for spaces, only a-z, 0-9, -, max {MAX_SLUG_LENGTH} chars, 5-6 words max) describing the main content or purpose.\n' | |
| f'2. "category": A single, lowercase, filesystem-safe category word (use hyphens for spaces if needed, only a-z, 0-9, -, max {MAX_CATEGORY_LENGTH} chars) classifying the screenshot.\n\n' | |
| "**Slug Generation Guidelines (extract maximum context and purpose):**\n" | |
| "- Messages (chat, email, notification): Reflect sender/topic (e.g. 'slack-message-project-update', 'email-invite-may2025').\n" | |
| "- UI/Tool/Dashboard/Settings: Specify product/app and purpose (e.g. 'figma-settings-dark-mode', 'github-pr-diff-view', 'grafana-dashboard-cpu-usage').\n" | |
| "- Code: Include language/platform if visible (e.g. 'python-error-traceback', 'react-component-inspector').\n" | |
| "- Calendar/Doc/Chart/Table/List: Describe content (e.g. 'google-calendar-april2025', 'asana-task-board', 'excel-finance-chart-q1').\n" | |
| "- Webpage: Identify the site and main topic (e.g., 'wikipedia-agentic-systems', 'reddit-discussion-llms').\n" | |
| "- Diagram/Flowchart: Describe the subject (e.g., 'system-architecture-diagram', 'user-flow-signup').\n" | |
| "- Meme/Media: Identify broadly (e.g. 'meme-distracted-boyfriend', 'youtube-screenshot-timestamped').\n" | |
| "- Personal/Misc: Describe scene (e.g., 'desktop-screenshot-clean', 'photo-cat-sleeping').\n\n" | |
| "**Category Generation Guidelines (choose ONE most fitting):**\n" | |
| f"- Examples: `code`, `ui`, `chat`, `email`, `document`, `webpage`, `diagram`, `chart`, `table`, `calendar`, `social`, `media`, `personal`, `finance`, `gaming`, `map`, `notification`, `settings`, `{DEFAULT_CATEGORY}`\n" | |
| "- Be specific but general enough for organization (e.g., prefer `chat` over `slack-message`).\n" | |
| f"- If unsure or the content is too generic, use '{DEFAULT_CATEGORY}'.\n\n" | |
| "**Output Format:**\n" | |
| 'Output ONLY the JSON object, like this: {"slug": "generated-slug-here", "category": "chosen-category-here"}' | |
| ) | |
| # Create a Chat object with the system prompt | |
| chat = lmstudio.Chat(system_prompt) | |
| # Prepare the user message (Image + Text Instruction) | |
| # Use TextData for structured text input (string fallback included) | |
| try: | |
| text_data = lmstudio.TextData( | |
| text="Analyze this screenshot and provide the JSON output as instructed." | |
| ) | |
| except AttributeError: | |
| logging.debug( | |
| "lmstudio.TextData not found, passing raw text string to chat." | |
| ) | |
| text_data = ( | |
| "Analyze this screenshot and provide the JSON output as instructed." | |
| ) | |
| chat.add_user_message([image_handle, text_data]) | |
| # 3. Get description (slug) and category from the model | |
| logging.debug("Querying LLM for slug and category...") | |
| # Increase the temperature value for more creative outputs. | |
| response = llm.respond(chat, config={"max_tokens": 100, "temperature": 0.1}) | |
| logging.debug(f"LLM Response raw: '{response.content}'") | |
| # 4. Parse the response and sanitize slug and category | |
| raw_slug_text, raw_category_text = parse_llm_response(response.content) | |
| # Sanitize both parts using the helper function | |
| slug = sanitize_filename_part( | |
| raw_slug_text, max_length=MAX_SLUG_LENGTH, is_category=False | |
| ) | |
| category_slug = sanitize_filename_part( | |
| raw_category_text, max_length=MAX_CATEGORY_LENGTH, is_category=True | |
| ) | |
| # A final check: if sanitization resulted in 'untitled' slug but we have a better category, refine the slug. | |
| if slug == "untitled" and category_slug != DEFAULT_CATEGORY: | |
| slug = f"{category_slug}-screenshot" | |
| logging.debug(f"Refined default slug based on category: '{slug}'") | |
| logging.info( | |
| f"LLM suggested: slug='{raw_slug_text}', category='{raw_category_text}' -> " | |
| f"Sanitized: slug='{slug}', category='{category_slug}'" | |
| ) | |
| # 5. Construct the new filename and target directory | |
| new_filename = f"{slug}_{date_str}{original_extension}" | |
| target_dir_path = screenshot_path.parent / category_slug | |
| new_path = target_dir_path / new_filename | |
| # 6. Check for potential overwrites or if move is redundant | |
| if new_path == screenshot_path: | |
| return ( | |
| False, | |
| f"Skipped: Target path is the same as original '{original_filename}'", | |
| ) | |
| # Handle potential filename collisions within the target category directory | |
| collision_counter = 0 | |
| base_slug_for_collision = slug | |
| while new_path.exists(): | |
| collision_counter += 1 | |
| slug = f"{base_slug_for_collision}-{collision_counter}" | |
| new_filename = f"{slug}_{date_str}{original_extension}" | |
| new_path = target_dir_path / new_filename | |
| logging.warning( | |
| f"Collision detected for '{new_path.name}' in category '{category_slug}'. " | |
| f"Attempting rename with counter: '{new_filename}'" | |
| ) | |
| if collision_counter > 10: | |
| logging.error( | |
| f"Failed: Too many collisions for base name in '{category_slug}'. Skipping." | |
| ) | |
| return ( | |
| False, | |
| f"Failed: Too many collisions for '{base_slug_for_collision}' in '{category_slug}'", | |
| ) | |
| # 7. Create directory and move the file (or print if dry run) | |
| if dry_run: | |
| log_prefix = "[DRY RUN]" | |
| move_action = "Would move" | |
| create_action = "Would create directory" | |
| final_message = f"Proposed move: '{original_filename}' -> '{category_slug}/{new_path.name}'" | |
| if not target_dir_path.exists(): | |
| logging.info( | |
| f"{log_prefix} {create_action} '{target_dir_path.resolve()}'" | |
| ) | |
| logging.info( | |
| f"{log_prefix} {move_action} '{original_filename}' to '{new_path.resolve()}'" | |
| ) | |
| return True, f"{log_prefix} {final_message}" | |
| else: | |
| log_prefix = "" | |
| move_action = "Moving" | |
| create_action = "Ensuring directory" | |
| final_message = ( | |
| f"Moved '{original_filename}' to '{category_slug}/{new_path.name}'" | |
| ) | |
| try: | |
| if not target_dir_path.exists(): | |
| logging.info( | |
| f"{create_action} '{target_dir_path.resolve()}' exists..." | |
| ) | |
| target_dir_path.mkdir(parents=True, exist_ok=True) | |
| else: | |
| logging.debug( | |
| f"Directory '{target_dir_path.resolve()}' already exists." | |
| ) | |
| except OSError as e: | |
| logging.error(f"Failed to create directory '{target_dir_path}': {e}") | |
| return False, f"Failed: OS Error creating directory - {e}" | |
| logging.info( | |
| f"{move_action} '{original_filename}' to '{new_path.resolve()}'" | |
| ) | |
| try: | |
| screenshot_path.rename(new_path) | |
| return True, final_message | |
| except OSError as e: | |
| logging.error( | |
| f"Failed to move file '{original_filename}' to '{new_path}': {e}" | |
| ) | |
| return False, f"Failed: OS Error moving file - {e}" | |
| except OSError as e: | |
| logging.error(f"OS error processing '{original_filename}': {e}") | |
| return False, f"Failed: OS Error - {e}" | |
| except Exception as e: | |
| logging.exception(f"Unexpected error processing '{original_filename}': {e}") | |
| return False, f"Failed: Unexpected Error - {e}" | |
| def process_directory(directory: str, model_identifier: str, dry_run: bool = False): | |
| """Processes all screenshots in the given directory.""" | |
| dir_path = Path(directory).resolve() | |
| if not dir_path.is_dir(): | |
| logging.error(f"Error: Directory not found or not a directory: {directory}") | |
| return | |
| try: | |
| # Initialize the actual client | |
| # Assumeing LMStudio is running on http://localhost:1234 | |
| client = lmstudio.Client() # Add base_url if needed | |
| # Get the model handle using the identifier provided | |
| llm = client.llm.model(model_identifier) | |
| # Use the identifier returned by the library if available (in case normalized/different model) | |
| actual_identifier = getattr(llm, "identifier", model_identifier) | |
| logging.info(f"Using model: {actual_identifier}") | |
| except AttributeError as e: | |
| logging.error( | |
| f"Failed to interact with LMStudio library - possible API mismatch: {e}" | |
| ) | |
| logging.error( | |
| "Check if the methods like 'client.llm.model()' or required attributes exist " | |
| "in your installed 'lmstudio' library version." | |
| ) | |
| return | |
| except Exception as e: | |
| logging.error(f"Unexpected error during LMStudio initialization: {e}") | |
| return | |
| processed_count = 0 | |
| skipped_count = 0 | |
| failed_count = 0 | |
| logging.info(f"Scanning directory: {dir_path}") | |
| if dry_run: | |
| logging.info("--- DRY RUN MODE ---") | |
| items_to_process = [] | |
| try: | |
| items_to_process = sorted( | |
| [item for item in dir_path.iterdir() if item.is_file()] | |
| ) | |
| except OSError as e: | |
| logging.error(f"Error listing directory contents: {e}") | |
| return | |
| total_items = len(items_to_process) | |
| if not items_to_process: | |
| logging.info("No files found in the target directory to process.") | |
| return | |
| logging.info(f"Found {total_items} potential files to process.") | |
| for i, item_path in enumerate(items_to_process): | |
| logging.info(f"--- Processing item {i + 1}/{total_items} ---") | |
| success, message = process_screenshot(item_path, llm, client, dry_run) | |
| if success: | |
| processed_count += 1 | |
| logging.info(message) | |
| elif "Skipped:" in message: | |
| skipped_count += 1 | |
| logging.info(message) | |
| else: | |
| failed_count += 1 | |
| logging.warning(message) | |
| logging.info("--- Processing Complete ---") | |
| logging.info(f"Successfully processed/proposed: {processed_count}") | |
| logging.info(f"Skipped (no date/wrong type/redundant/etc.): {skipped_count}") | |
| logging.info(f"Failed: {failed_count}") | |
| if dry_run: | |
| logging.info("--- END DRY RUN ---") | |
| # --- Entry Point --- | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| description="Rename and organize screenshot files based on their content using AI and LMStudio.", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| parser.add_argument( | |
| "directory", type=str, help="The directory containing the screenshot files." | |
| ) | |
| parser.add_argument( | |
| "-m", | |
| "--model", | |
| type=str, | |
| required=True, | |
| help="The identifier of the loaded multimodal model in LMStudio (e.g., 'gemma-3-4b-it-qat'). Needs to support image input and ideally JSON output.", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be moved and categorized without actually changing files.", | |
| ) | |
| parser.add_argument( | |
| "-v", | |
| "--verbose", | |
| action="store_true", | |
| help="Enable debug logging for more details.", | |
| ) | |
| args = parser.parse_args() | |
| if args.verbose: | |
| logging.getLogger().setLevel(logging.DEBUG) | |
| logging.debug("Debug logging enabled.") | |
| # Pass the model identifier and dry_run flag to the processing function | |
| process_directory(args.directory, args.model, args.dry_run) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment