Last active
October 28, 2021 10:15
-
-
Save sveetch/e29af9920f10f6b2b3d93dd92ee9f406 to your computer and use it in GitHub Desktop.
Filename renamer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Command line script to rename media files. | |
Prerequisite: | |
* Python >= 3.6; | |
* Unicode and Emoji fonts enabled in your terminal console; | |
""" | |
import json | |
import os | |
# Empty job configuration matrix | |
EMPTY_JOB = { | |
"name": None, | |
"basepath": None, | |
"extensions": None, | |
"reversed": False, | |
"tasks": [], | |
} | |
def validate_job_files(filepaths): | |
""" | |
Validate given filepaths exists and are valid JSON. | |
Arguments: | |
filepaths (list): List of filepaths. | |
Returns: | |
list: List of error message strings. | |
""" | |
errors = [] | |
for path in filepaths: | |
if not os.path.exists(path): | |
errors.append( | |
"🚨 Configuration file does not exists: {}".format(path) | |
) | |
else: | |
try: | |
with open(path, "r") as fp: | |
data = json.load(fp) | |
except json.decoder.JSONDecodeError as e: | |
errors.append( | |
"🚨 Configuration file is not a valid JSON file: {}".format(path) | |
) | |
errors.append( | |
" {}".format(str(e)) | |
) | |
else: | |
if "basepath" not in data: | |
errors.append( | |
( | |
"🚨 Configuration file is missing required 'basepath' " | |
"item: {}" | |
).format(path) | |
) | |
if "tasks" not in data: | |
errors.append( | |
( | |
"🚨 Configuration file miss required 'tasks' item: {}" | |
).format(path) | |
) | |
return errors | |
def init_job_configs(paths): | |
""" | |
Validate and use each directory paths to create empty JSON Job configuration files. | |
If a Job configuration file already exists for a directory, it won't be overwritten | |
and a warning will be printed. Errors and warnings don't break the process. | |
Arguments: | |
paths (list): List of directory path. | |
""" | |
print("Initialize empty JSON Job configuration files for given directories.") | |
print() | |
for path in paths: | |
# Create filename slug from directory path | |
slug = path | |
if slug.startswith("/"): | |
slug = slug[1:] | |
if slug.endswith("/"): | |
slug = slug[:-1] | |
slug = slug.replace("/", "_") | |
job_filename = "{}.json".format(slug) | |
# Path must exists | |
if not os.path.exists(path): | |
print( | |
( | |
"🚨 Path does not exits: {}" | |
).format(path) | |
) | |
# Path must be a directory | |
elif not os.path.isdir(path): | |
print( | |
( | |
"🚨 Path is not a directory: {}" | |
).format(path) | |
) | |
# Check if future job filename already exists | |
elif os.path.exists(job_filename): | |
print( | |
( | |
"🚨 A job configuration file already exists for path: {}" | |
).format(path) | |
) | |
# Create job configuration file | |
else: | |
data = EMPTY_JOB.copy() | |
data["basepath"] = os.path.abspath(path) | |
print( | |
( | |
"✅ Created JSON Job configuration file: {}" | |
).format(job_filename) | |
) | |
with open(job_filename, "w") as fp: | |
json.dump(data, fp, indent=4) | |
return | |
class AsciiOutputFormatter: | |
""" | |
The output formatter class to include format methods for message rows. | |
""" | |
def __init__(self): | |
self.max_label = None | |
def max_label_length(self, tasks): | |
""" | |
Crawl every task to get the bigger name length. | |
Returns: | |
integer: Length of bigger name or zero if task list is empty. | |
""" | |
if not tasks: | |
return 0 | |
return max([len(k) for k,v in tasks]) | |
def _format_row(self, index, label=None, message=None, state=None): | |
""" | |
Format row message from given arguments. | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
Keyword Arguments: | |
label (string): A label to display surrounded and padded. | |
message (string): A message to display. | |
state (string): State name can be start, default, debug or end. It will | |
define how the row will be formatted. | |
Returns: | |
string: Formatted message. | |
""" | |
content = [] | |
# Select the string start | |
start = "├─ " | |
if state == "start": | |
start = "┍━{{{}}} ".format(index) | |
elif state == "debug": | |
start = "├┄ " | |
elif state == "end": | |
start = "┕━ " | |
# Add label surrounding and padding | |
label = label or "" | |
if label: | |
template = "[{}] " | |
label = template.format(label) | |
label = label.ljust(len(template.format("")) + self.max_label) | |
# Put message if any | |
message = message or "" | |
return "".join([start, label, message]) | |
class JobTasks: | |
""" | |
Container for every available tasks | |
""" | |
TASK_DEFINITIONS = { | |
"capitalize": { | |
"method": "capitalize", | |
}, | |
"lowercase": { | |
"method": "lowercase", | |
}, | |
"uppercase": { | |
"method": "uppercase", | |
}, | |
"replace": { | |
"method": "replace", | |
"options": ["from", "to"], | |
}, | |
"add_prefix": { | |
"method": "add_prefix", | |
"options": ["prefix"], | |
}, | |
"underscore_to_dash": { | |
"method": "convert_underscore_to_dash", | |
}, | |
"numerate": { | |
"method": "numerate", | |
"options": ["start", "zfill"], | |
}, | |
"remove_segment": { | |
"method": "remove_segment", | |
"options": ["divider", "slice_start"], | |
}, | |
} | |
def remove_segment(self, index, basepath, source, **options): | |
""" | |
Remove a segment from splitted filename. | |
Filename is splitted on a divider and an item at given position is removed, it | |
won't never apply on file extensions. If filename have only one segment like | |
"foo.txt", nothing is removed. Joiner option can use a custom string to join | |
segments, default string if not given is "-". | |
TODO: | |
Opposed to its name, this does not remove target segment with last+end, but | |
instead it retain only segments from given start to end, segment which are | |
not in start+end bounds are lost. | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
basepath (string): Directory path where to rename files. | |
source (string): Source filename. | |
**options (dict): Tasks options, require options "divider" and "slice_start" and | |
accept "slice_end" and "joiner" | |
Returns: | |
tuple: A tuple of current filename and new filename. | |
""" | |
divider = options["divider"] | |
joiner = options.get("joiner", "-") | |
slice_start = options["slice_start"] | |
slice_end = options.get("slice_end", None) | |
src = os.path.join(basepath, source) | |
# Split source to get distinct name from extensions | |
root, exts = os.path.splitext(source) | |
# Split name on divider | |
segments = root.split(divider) | |
# Only remove segment if there is more than one | |
if len(segments) > 1: | |
if slice_end: | |
segments = segments[slice_start:slice_end] | |
else: | |
segments = segments[slice_start:] | |
new_filename = joiner.join(segments) + exts | |
dst = os.path.join(basepath, new_filename) | |
if self.verbose: | |
print( | |
self._format_row( | |
index, | |
label="remove_segment", | |
message=new_filename, | |
state="debug", | |
) | |
) | |
return (source, new_filename) | |
def replace(self, index, basepath, source, **options): | |
""" | |
Replace all occurences of a string (from) by another one (to). | |
Just use the common String.replace() method. This is applied on the whole | |
filename, including the file extensions. | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
basepath (string): Directory path where to rename files. | |
source (string): Source filename. | |
**options (dict): Tasks options, expect options "from" and "to". | |
Returns: | |
tuple: A tuple of current filename and new filename. | |
""" | |
src = os.path.join(basepath, source) | |
from_string = options["from"] | |
to_string = options["to"] | |
new_filename = source.replace(from_string, to_string) | |
dst = os.path.join(basepath, new_filename) | |
if self.verbose: | |
print( | |
self._format_row( | |
index, | |
label="replace", | |
message=new_filename, | |
state="debug", | |
) | |
) | |
return (source, new_filename) | |
def capitalize(self, index, basepath, source, **options): | |
""" | |
Capitalize file name (work on the whole filename including extension). | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
basepath (string): Directory path where to rename files. | |
source (string): Source filename. | |
**options (dict): Tasks options, this filter does not expect any options. | |
Returns: | |
tuple: A tuple of current filename and new filename. | |
""" | |
src = os.path.join(basepath, source) | |
new_filename = source.capitalize() | |
dst = os.path.join(basepath, new_filename) | |
if self.verbose: | |
print( | |
self._format_row( | |
index, | |
label="capitalize", | |
message=new_filename, | |
state="debug", | |
) | |
) | |
return (source, new_filename) | |
def lowercase(self, index, basepath, source, **options): | |
""" | |
Lowercase file name(work on the whole filename including extension). | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
basepath (string): Directory path where to rename files. | |
source (string): Source filename. | |
**options (dict): Tasks options, this filter does not expect any options. | |
Returns: | |
tuple: A tuple of current filename and new filename. | |
""" | |
src = os.path.join(basepath, source) | |
new_filename = source.lower() | |
dst = os.path.join(basepath, new_filename) | |
if self.verbose: | |
print( | |
self._format_row( | |
index, | |
label="lowercase", | |
message=new_filename, | |
state="debug", | |
) | |
) | |
return (source, new_filename) | |
def uppercase(self, index, basepath, source, **options): | |
""" | |
Uppercase file name(work on the whole filename including extension). | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
basepath (string): Directory path where to rename files. | |
source (string): Source filename. | |
**options (dict): Tasks options, this filter does not expect any options. | |
Returns: | |
tuple: A tuple of current filename and new filename. | |
""" | |
src = os.path.join(basepath, source) | |
new_filename = source.upper() | |
dst = os.path.join(basepath, new_filename) | |
if self.verbose: | |
print( | |
self._format_row( | |
index, | |
label="uppercase", | |
message=new_filename, | |
state="debug", | |
) | |
) | |
return (source, new_filename) | |
def add_prefix(self, index, basepath, source, **options): | |
""" | |
Add a prefix before each file from given directory path. | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
basepath (string): Directory path where to rename files. | |
source (string): Source filename. | |
**options (dict): Tasks options, expect an option "prefix". | |
Returns: | |
tuple: A tuple of current filename and new filename. | |
""" | |
prefix = options["prefix"] | |
src = os.path.join(basepath, source) | |
new_filename = prefix + source | |
dst = os.path.join(basepath, new_filename) | |
if self.verbose: | |
print( | |
self._format_row( | |
index, | |
label="add_prefix", | |
message=new_filename, | |
state="debug", | |
) | |
) | |
return (source, new_filename) | |
def numerate(self, index, basepath, source, **options): | |
""" | |
Add current index position of item in file list. | |
Index number is filled (from given zfill length) to the right with '0' and | |
divided from filename with given divider string. | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
basepath (string): Directory path where to rename files. | |
source (string): Source filename. | |
**options (dict): Tasks options, expect options "start", "zfill" and optional | |
"divider". | |
Returns: | |
tuple: A tuple of current filename and new filename. | |
""" | |
start = options["start"] | |
zfill = options["zfill"] | |
divider = options.get("divider", "_") | |
src = os.path.join(basepath, source) | |
new_filename = "{index}{divider}{filename}".format( | |
index=str(index).zfill(zfill), | |
divider=divider, | |
filename=source, | |
) | |
dst = os.path.join(basepath, new_filename) | |
if self.verbose: | |
print( | |
self._format_row( | |
index, | |
label="numerate", | |
message=new_filename, | |
state="debug", | |
) | |
) | |
return (source, new_filename) | |
def convert_underscore_to_dash(self, index, basepath, source, **options): | |
""" | |
Convert every ".mp4" filenames from "/youdl/youdl/" with: | |
* "_" to "-"; | |
* "---" to "_"; | |
* So from "ping_-_foo_bar.mp4" convert to "ping_foo-bar.mp4". | |
Arguments: | |
index (integer): Index integer for current item in the walked list. | |
basepath (string): Directory path where to rename files. | |
source (string): Source filename. | |
**options (dict): Tasks options where method select the one it needs. | |
Returns: | |
tuple: A tuple of current filename and new filename. | |
""" | |
src = os.path.join(basepath, source) | |
new_filename = source.replace("_", "-").replace("---", "_") | |
dst = os.path.join(basepath, new_filename) | |
if self.verbose: | |
print( | |
self._format_row( | |
index, | |
label="underscore_to_dash", | |
message=new_filename, | |
state="debug", | |
) | |
) | |
return (source, new_filename) | |
class CommonRenameJob(AsciiOutputFormatter, JobTasks): | |
""" | |
Renaming job manager. | |
Attributes: | |
TASK_DEFINITIONS (dict): Job definitions, with their method name and required | |
options. | |
Keyword Arguments: | |
verbose (boolean): If enabled, task can print out some debug informations. This | |
is enabled by default. | |
dry_run (boolean): If enabled, task will perform its jobs without writing | |
anything on filesystem. This is enabled by default. | |
""" | |
def __init__(self, verbose=True, dry_run=True): | |
super().__init__() | |
self.verbose = verbose | |
self.dry_run = dry_run | |
self.reverse_store = [] | |
def task_option_validate(self, tasks): | |
""" | |
Validate task options. | |
Arguments: | |
tasks (list): List of task names and their options to apply on each file, | |
order does matter. | |
Returns: | |
list: List of errors if any. | |
""" | |
errors = [] | |
for name, options in tasks: | |
if name not in self.TASK_DEFINITIONS: | |
errors.append( | |
" 🔖 There is no task with name '{}'.".format(name) | |
) | |
continue | |
task_method = self.TASK_DEFINITIONS[name]["method"] | |
task_option_names = self.TASK_DEFINITIONS[name].get("options", []) | |
if len(task_option_names) > 0: | |
for name in task_option_names: | |
if name not in options: | |
errors.append( | |
" 🔖 Task '{}' require a '{}' option.".format( | |
task_method, | |
name | |
) | |
) | |
return errors | |
def walk_files(self, basepath, sort=True, reverse=False, extensions=None): | |
""" | |
Return list of elligible files from given directory path. This is not recursive. | |
Arguments: | |
basepath (string): Directory path where to rename files. | |
Keyword Arguments: | |
sort (boolean): If enabled apply basic alphabetical sorting on directory | |
listing. | |
reverse (boolean): If enabled the file list will be reversed (after sorting | |
if any). This is disabled by default. | |
extensions (list): File extensions to filter on. If empty list or None, | |
file list will not be filtered against any file extension. If | |
not empty, file list will be filtered so only file with one of given | |
extension will be used. Each extension has to be given without leading | |
dot. | |
""" | |
files = [] | |
if self.verbose: | |
if self.dry_run: | |
print("• Dry run mode enabled, no file will be modified.") | |
else: | |
print("• Dry run mode disabled, files will be modified.") | |
msg = (extensions or ["All"]) | |
print("• Allowed file extension:", ", ".join(msg)) | |
for filename in os.listdir(basepath): | |
# Ignore file without allowed extension (if any) | |
ext = filename.split(".")[-1] | |
if extensions and not filename.endswith(tuple(extensions)): | |
continue | |
path = os.path.join(basepath, filename) | |
if os.path.isfile(path): | |
files.append(filename) | |
if sort: | |
files = sorted(files) | |
if reverse: | |
files = reversed(files) | |
return files | |
def perform_path_tasks(self, basepath, tasks, sort=True, reverse=False, | |
extensions=None): | |
""" | |
Perform many tasks on files from given directory path. | |
Arguments: | |
basepath (string): Directory path where to rename files. | |
tasks (list): List of task names and their options to apply on each file, | |
order does matter. | |
Keyword Arguments: | |
sort (boolean): If enabled apply basic alphabetical sorting on directory | |
listing. | |
reverse (boolean): If enabled the file list will be reversed (after sorting | |
if any). This is disabled by default. | |
extensions (list): File extensions to filter on. | |
""" | |
files = self.walk_files( | |
basepath, | |
sort=sort, | |
extensions=extensions, | |
reverse=reverse, | |
) | |
errors = self.task_option_validate(tasks) | |
if len(errors) > 0: | |
print("🚨 There is some errors:") | |
print("\n".join(errors)) | |
return | |
# Alter enumerate start number from possible options | |
enumerate_start = 1 | |
if "numerate" in [k for k, v in tasks]: | |
enumerate_start = dict(tasks)["numerate"]["start"] | |
self.max_label = self.max_label_length(tasks) | |
# Walk on files | |
for i, source in enumerate(files, start=enumerate_start): | |
source_path = os.path.join(basepath, source) | |
# Start destination from original filename | |
destination = source | |
# Print the starting row | |
print() | |
print( | |
self._format_row( | |
i, | |
message=source_path, | |
state="start", | |
) | |
) | |
# Apply every tasks on file | |
for name, options in tasks: | |
task_method = self.TASK_DEFINITIONS[name]["method"] | |
# Perform task | |
paths = getattr(self, task_method)(i, basepath, destination, **options) | |
# Update destination filename from returned task value | |
destination = paths[1] | |
dest_path = os.path.join(basepath, destination) | |
print(self._format_row( | |
i, message="♻️FROM: {}".format(source) | |
)) | |
if dest_path != source_path: | |
if os.path.exists(dest_path): | |
print(self._format_row( | |
i, | |
message="🚨 Destination already exists: {}".format(destination), | |
state="end", | |
)) | |
elif self.dry_run: | |
print(self._format_row( | |
i, message="✨ DEST: {}".format(destination), state="end", | |
)) | |
else: | |
print(self._format_row( | |
i, message="✅ DEST: {}".format(destination), state="end", | |
)) | |
self.reverse_store.append( | |
(source_path, dest_path) | |
) | |
# Perform renaming | |
os.rename(source_path, dest_path) | |
else: | |
print(self._format_row( | |
i, | |
message=( | |
"🚨 Source and destination paths are identical, there is " | |
"nothing to rename." | |
), | |
state="end", | |
)) | |
return | |
if __name__ == '__main__': | |
import argparse | |
class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter, | |
argparse.RawDescriptionHelpFormatter): | |
""" | |
Implement some argparse formatters | |
""" | |
pass | |
parser = argparse.ArgumentParser( | |
description=( | |
"Rename multiple files from a directories with JSON job configuration files." | |
"\n\n" | |
"Default behavior is to not change anything, just list task it would do and" | |
"\nyou need to enable commit with the '--commit' argument." | |
"\n\n" | |
"You can't mix job and init modes and so mix Job filepath and directory " | |
"paths." | |
"\n\n" | |
"Finally, if any job task fails all renamed file will be reversed to their " | |
"original filename." | |
), | |
formatter_class=CustomFormatter, | |
) | |
# Add arguments and options | |
parser.add_argument( | |
"paths", | |
type=str, | |
metavar="PATH", | |
nargs="+", | |
help=( | |
"JSON Job configuration filepath or directory path if 'init' mode is on. " | |
"Already existing config will not be overwritten." | |
) | |
) | |
parser.add_argument( | |
"--init", | |
default=False, | |
action="store_true", | |
help=( | |
"Switch mode to create an empty Job configuration for each given basepath. " | |
"Created Job configuration will just have the 'basepath' value set to " | |
"given directory path as an absolute path." | |
), | |
) | |
parser.add_argument( | |
"--commit", | |
default=False, | |
action="store_true", | |
help=( | |
"Commit job, this is disabled by default to ensure you are testing jobs " | |
"before commiting anything on files." | |
), | |
) | |
parser.add_argument( | |
"--no-verbose", | |
default=False, | |
action="store_true", | |
help=( | |
"Disable verbosity, this is enabled by default to help debugging jobs " | |
"before commiting." | |
), | |
) | |
args = parser.parse_args() | |
if args.init: | |
init_job_configs(args.paths) | |
else: | |
# Parse and validate given job configs | |
errors = validate_job_files(args.paths) | |
if len(errors) > 0: | |
print("\n".join(errors)) | |
else: | |
# Perform all jobs | |
try: | |
# Initialize renamer | |
renamer = CommonRenameJob( | |
dry_run=not(args.commit), | |
verbose=not(args.no_verbose), | |
) | |
# Run jobs | |
for path in args.paths: | |
with open(path, "r") as fp: | |
parameters = json.load(fp) | |
name = parameters.get("name", None) or "Job from '{}'".format(path) | |
tasks = parameters.get("tasks", None) | |
extensions = parameters.get("extensions", None) | |
basepath = parameters.get("basepath", None) | |
reverse = parameters.get("reversed", False) | |
print() | |
print("➖" * 30) | |
print(name) | |
print("➖" * 30) | |
renamer.perform_path_tasks( | |
basepath, | |
tasks=tasks, | |
extensions=extensions, | |
reverse=reverse, | |
) | |
except: | |
# Restore every renamed files before error | |
print() | |
print( | |
( | |
"🚨 An error occured during a job, every files will be " | |
"restored with their original filename." | |
) | |
) | |
print() | |
# Restore renamed file to their original filenames | |
for src, dst in renamer.reverse_store: | |
print("*", dst, "=>", src) | |
os.rename(dst, src) | |
print() | |
# Finally raise the original error | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment