Created
April 24, 2024 13:23
-
-
Save sengstacken/9834992c1bc4235b6b1e3b704747ea64 to your computer and use it in GitHub Desktop.
Get file structure for LLM upload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import tarfile | |
from collections import defaultdict | |
import logging | |
# Setup basic logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def copy_and_compress(src_directory, temp_directory, max_types, max_size): | |
def prepare_file_dict(directory): | |
# Scan directory and prepare file type counts | |
file_dict = defaultdict(list) | |
for item in os.listdir(directory): | |
path = os.path.join(directory, item) | |
if os.path.isfile(path) and os.path.getsize(path) <= max_size: | |
file_type = item.split('.')[-1] | |
file_dict[file_type].append(path) | |
return file_dict | |
def select_files_to_copy(file_dict): | |
# Determine which files to copy based on the threshold | |
for file_type, paths in file_dict.items(): | |
if len(paths) > max_types: | |
# If more than max_types exist, include only the first one | |
yield paths[0] | |
else: | |
# Include all files if they are less than or equal to max_types | |
yield from paths | |
if not os.path.exists(temp_directory): | |
os.makedirs(temp_directory) | |
logging.info(f'Created temporary directory at {temp_directory}') | |
# Walk through the directory structure | |
for subdir, dirs, files in os.walk(src_directory): | |
file_dict = prepare_file_dict(subdir) | |
files_to_copy = select_files_to_copy(file_dict) | |
for file in files_to_copy: | |
relative_path = os.path.relpath(file, start=src_directory) | |
destination_path = os.path.join(temp_directory, relative_path) | |
os.makedirs(os.path.dirname(destination_path), exist_ok=True) | |
shutil.copy(file, destination_path) | |
logging.info(f'Copied {file} to {destination_path}') | |
# Compress the temporary directory | |
tar_path = temp_directory + '.tar.gz' | |
with tarfile.open(tar_path, "w:gz") as tar: | |
tar.add(temp_directory, arcname=os.path.basename(temp_directory)) | |
logging.info(f'Compressed directory saved as {tar_path}') | |
# Clean up temporary files | |
try: | |
shutil.rmtree(temp_directory) | |
logging.info(f'Deleted temporary directory {temp_directory}') | |
except Exception as e: | |
logging.error(f'Failed to delete temporary directory {temp_directory}: {e}') | |
# Example usage | |
copy_and_compress('/path/to/source', '/path/to/temp', 20, 1024 * 1024) # N=20 types, Z=1MB |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment