Skip to content

Instantly share code, notes, and snippets.

@softyoda
Created July 19, 2024 15:09
Show Gist options
  • Save softyoda/c4e121b192d3ebd03b92781fe54cd1ef to your computer and use it in GitHub Desktop.
Save softyoda/c4e121b192d3ebd03b92781fe54cd1ef to your computer and use it in GitHub Desktop.
Replace ktx2 texture with png ones to import in blender without KHR_texture_basisu
import os
import subprocess
import json
import multiprocessing
from tqdm import tqdm
import time
import tempfile
import shutil
import logging
import glob
import uuid
logging.basicConfig(filename='conversion_log.txt', level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
GLTF_TRANSFORM_PATH = os.path.join(os.environ.get('APPDATA'), 'npm', 'gltf-transform.cmd')
if not os.path.exists(GLTF_TRANSFORM_PATH):
raise FileNotFoundError(f"gltf-transform.cmd not found at {GLTF_TRANSFORM_PATH}")
KTX_PATH = 'ktx'
def run_subprocess(cmd, check=True):
return subprocess.run(cmd, check=check, capture_output=True, text=True,
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0)
def retry_subprocess(cmd, max_retries=3, delay=1, cwd=None):
for attempt in range(max_retries):
try:
return subprocess.run(cmd, check=True, capture_output=True, text=True,
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0,
cwd=cwd)
except subprocess.CalledProcessError as e:
logging.error(f"Command failed. Attempt {attempt + 1}/{max_retries}. Exit code: {e.returncode}")
logging.error(f"Error output: {e.stderr}")
if attempt == max_retries - 1:
raise
time.sleep(delay)
def convert_glb_to_gltf(glb_path, gltf_path):
return retry_subprocess([GLTF_TRANSFORM_PATH, 'copy', glb_path, gltf_path])
def convert_ktx2_to_png(ktx2_path, png_path):
result = retry_subprocess([KTX_PATH, 'extract', ktx2_path, png_path], max_retries=5)
return os.path.exists(png_path)
def convert_gltf_to_glb_with_draco(gltf_path, glb_path):
return retry_subprocess([GLTF_TRANSFORM_PATH, 'draco', gltf_path, glb_path], max_retries=5)
def create_basic_gltf(gltf_path, png_paths, output_path):
with open(gltf_path, 'r') as f:
gltf_data = json.load(f)
gltf_data.pop('extensionsUsed', None)
gltf_data.pop('extensionsRequired', None)
if png_paths:
gltf_data['images'] = [{'uri': os.path.basename(png_path), 'mimeType': 'image/png'} for png_path in png_paths]
gltf_data['textures'] = [{'source': i} for i in range(len(png_paths))]
gltf_data['materials'] = [{
'pbrMetallicRoughness': {
'baseColorTexture': {'index': i},
'metallicFactor': 0.0,
'roughnessFactor': 1.0
}
} for i in range(len(png_paths))]
for mesh in gltf_data['meshes']:
for primitive in mesh['primitives']:
primitive['material'] = 0
else:
for key in ['images', 'textures', 'materials']:
gltf_data.pop(key, None)
with open(output_path, 'w') as f:
json.dump(gltf_data, f, indent=2)
def convert_gltf_to_glb(gltf_path, glb_path):
gltf_dir = os.path.dirname(gltf_path)
return retry_subprocess([GLTF_TRANSFORM_PATH, 'pack', gltf_path, glb_path], max_retries=5, cwd=gltf_dir)
def verify_glb(glb_path):
try:
with open(glb_path, 'rb') as f:
if f.read(4) != b'glTF':
return False
version = int.from_bytes(f.read(4), byteorder='little')
if version != 2:
return False
file_length = int.from_bytes(f.read(4), byteorder='little')
if file_length != os.path.getsize(glb_path):
return False
return True
except Exception as e:
logging.error(f"Error verifying GLB file {glb_path}: {str(e)}")
return False
def process_file(glb_path, overwrite, keep_png):
start_time = time.time()
logging.info(f"Starting to process {glb_path}")
if not verify_glb(glb_path):
return False, f"Error processing {os.path.basename(glb_path)}: Invalid GLB file"
base_name = os.path.splitext(glb_path)[0]
original_dir = os.path.dirname(glb_path)
unique_id = uuid.uuid4().hex[:8] # Generate a unique identifier
with tempfile.TemporaryDirectory(dir=original_dir, prefix=f"temp_{os.path.basename(base_name)}_{unique_id}_") as temp_dir:
gltf_path = os.path.join(temp_dir, f"{os.path.basename(base_name)}_{unique_id}.gltf")
output_gltf_path = os.path.join(temp_dir, f"{os.path.basename(base_name)}_{unique_id}_temp.gltf")
output_glb_path = glb_path if overwrite else os.path.join(original_dir, f"{os.path.basename(base_name)}_updated.glb")
try:
# Convert GLB to GLTF
convert_glb_to_gltf(glb_path, gltf_path)
with open(gltf_path, 'r') as f:
gltf_data = json.load(f)
png_paths = []
if 'images' in gltf_data:
for index, image in enumerate(gltf_data['images']):
if 'uri' in image and image['uri'].endswith('.ktx2'):
ktx2_path = os.path.join(temp_dir, image['uri'])
png_filename = f"{os.path.basename(base_name)}_{unique_id}_texture_{index}.png"
png_path = os.path.join(temp_dir, png_filename)
if convert_ktx2_to_png(ktx2_path, png_path):
png_paths.append(png_path)
image['uri'] = png_filename
image['mimeType'] = 'image/png'
logging.info(f"Converted KTX2 to PNG: {png_path}")
# Remove KHR_texture_basisu extension
if 'extensionsUsed' in gltf_data:
gltf_data['extensionsUsed'] = [ext for ext in gltf_data['extensionsUsed'] if ext != 'KHR_texture_basisu']
if 'extensionsRequired' in gltf_data:
gltf_data['extensionsRequired'] = [ext for ext in gltf_data['extensionsRequired'] if ext != 'KHR_texture_basisu']
# Update textures and materials
if 'textures' in gltf_data:
for texture in gltf_data['textures']:
if 'extensions' in texture and 'KHR_texture_basisu' in texture['extensions']:
source = texture['extensions']['KHR_texture_basisu']['source']
texture['source'] = source
del texture['extensions']['KHR_texture_basisu']
if not texture['extensions']:
del texture['extensions']
if 'materials' in gltf_data:
for material in gltf_data['materials']:
if 'pbrMetallicRoughness' in material:
pbr = material['pbrMetallicRoughness']
if 'baseColorTexture' in pbr:
if 'extensions' in pbr['baseColorTexture']:
del pbr['baseColorTexture']['extensions']
# Write updated GLTF
with open(output_gltf_path, 'w') as f:
json.dump(gltf_data, f, indent=2)
# Convert updated GLTF back to GLB
convert_gltf_to_glb_with_draco(output_gltf_path, output_glb_path)
if keep_png:
for png_path in png_paths:
shutil.move(png_path, os.path.join(original_dir, os.path.basename(png_path)))
else:
for png_path in png_paths:
os.remove(png_path)
except Exception as e:
logging.error(f"Error processing {glb_path}: {str(e)}")
return False, f"Error processing {os.path.basename(glb_path)}: {str(e)}"
processing_time = time.time() - start_time
logging.info(f"Finished processing {glb_path} in {processing_time:.2f} seconds")
return True, f"Processed {os.path.basename(glb_path)} in {processing_time:.2f} seconds, extracted {len(png_paths)} textures"
def get_glb_files(directory):
glb_files = {}
for root, _, files in os.walk(directory):
glb_in_dir = [f for f in files if f.endswith('.glb') and not f.endswith('_updated.glb')]
if glb_in_dir:
glb_files[root] = [os.path.join(root, f) for f in glb_in_dir]
return glb_files
def process_file_wrapper(args):
return process_file(*args)
def process_directory(directory, overwrite, keep_png):
start_time = time.time()
glb_files_by_dir = get_glb_files(directory)
total_files = sum(len(files) for files in glb_files_by_dir.values())
print(f"Found {total_files} GLB files to process in {len(glb_files_by_dir)} directories.")
num_cores = max(1, multiprocessing.cpu_count() - 1)
print(f"Using {num_cores} CPU cores for processing.")
successful = 0
errors = []
processed_in_batch = 0
with multiprocessing.Pool(num_cores) as pool:
with tqdm(total=total_files, desc="Processing files", bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]\n') as pbar:
for dir_path, files in glb_files_by_dir.items():
print(f"\nSwitching to directory: {dir_path}")
for success, message in pool.imap_unordered(process_file_wrapper, [(glb_path, overwrite, keep_png) for glb_path in files]):
if success:
successful += 1
print(message)
else:
errors.append(message)
print(f"Error: {message}")
processed_in_batch += 1
if processed_in_batch == num_cores or processed_in_batch == len(files):
pbar.update(processed_in_batch)
processed_in_batch = 0
if pbar.n < total_files:
pbar.update(total_files - pbar.n)
total_time = time.time() - start_time
print(f"\nProcessing complete. Successfully processed {successful}/{total_files} files.")
if errors:
print("\nErrors encountered:")
for error in errors:
print(error)
return successful, total_files, errors, total_time
if __name__ == '__main__':
directory = r"C:\Users\Yoann\Desktop\test_convert_gltf\extracted\tmp_workflow_convertion_automatique\tmp"
overwrite = True
keep_png = False
successful, total_files, errors, total_time = process_directory(directory, overwrite, keep_png)
print(f"All {total_files} files have been processed. {successful} succeeded, {len(errors)} failed in {total_time:.2f} seconds.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment