Skip to content

Instantly share code, notes, and snippets.

@sveetch
Last active October 28, 2021 10:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sveetch/e29af9920f10f6b2b3d93dd92ee9f406 to your computer and use it in GitHub Desktop.
Save sveetch/e29af9920f10f6b2b3d93dd92ee9f406 to your computer and use it in GitHub Desktop.
Filename renamer
"""
Command line script to rename media files.
Prerequisite:
* Python >= 3.6;
* Unicode and Emoji fonts enabled in your terminal console;
"""
import json
import os
# Empty job configuration matrix
EMPTY_JOB = {
"name": None,
"basepath": None,
"extensions": None,
"reversed": False,
"tasks": [],
}
def validate_job_files(filepaths):
"""
Validate given filepaths exists and are valid JSON.
Arguments:
filepaths (list): List of filepaths.
Returns:
list: List of error message strings.
"""
errors = []
for path in filepaths:
if not os.path.exists(path):
errors.append(
"🚨 Configuration file does not exists: {}".format(path)
)
else:
try:
with open(path, "r") as fp:
data = json.load(fp)
except json.decoder.JSONDecodeError as e:
errors.append(
"🚨 Configuration file is not a valid JSON file: {}".format(path)
)
errors.append(
" {}".format(str(e))
)
else:
if "basepath" not in data:
errors.append(
(
"🚨 Configuration file is missing required 'basepath' "
"item: {}"
).format(path)
)
if "tasks" not in data:
errors.append(
(
"🚨 Configuration file miss required 'tasks' item: {}"
).format(path)
)
return errors
def init_job_configs(paths):
"""
Validate and use each directory paths to create empty JSON Job configuration files.
If a Job configuration file already exists for a directory, it won't be overwritten
and a warning will be printed. Errors and warnings don't break the process.
Arguments:
paths (list): List of directory path.
"""
print("Initialize empty JSON Job configuration files for given directories.")
print()
for path in paths:
# Create filename slug from directory path
slug = path
if slug.startswith("/"):
slug = slug[1:]
if slug.endswith("/"):
slug = slug[:-1]
slug = slug.replace("/", "_")
job_filename = "{}.json".format(slug)
# Path must exists
if not os.path.exists(path):
print(
(
"🚨 Path does not exits: {}"
).format(path)
)
# Path must be a directory
elif not os.path.isdir(path):
print(
(
"🚨 Path is not a directory: {}"
).format(path)
)
# Check if future job filename already exists
elif os.path.exists(job_filename):
print(
(
"🚨 A job configuration file already exists for path: {}"
).format(path)
)
# Create job configuration file
else:
data = EMPTY_JOB.copy()
data["basepath"] = os.path.abspath(path)
print(
(
"✅ Created JSON Job configuration file: {}"
).format(job_filename)
)
with open(job_filename, "w") as fp:
json.dump(data, fp, indent=4)
return
class AsciiOutputFormatter:
"""
The output formatter class to include format methods for message rows.
"""
def __init__(self):
self.max_label = None
def max_label_length(self, tasks):
"""
Crawl every task to get the bigger name length.
Returns:
integer: Length of bigger name or zero if task list is empty.
"""
if not tasks:
return 0
return max([len(k) for k,v in tasks])
def _format_row(self, index, label=None, message=None, state=None):
"""
Format row message from given arguments.
Arguments:
index (integer): Index integer for current item in the walked list.
Keyword Arguments:
label (string): A label to display surrounded and padded.
message (string): A message to display.
state (string): State name can be start, default, debug or end. It will
define how the row will be formatted.
Returns:
string: Formatted message.
"""
content = []
# Select the string start
start = "├─ "
if state == "start":
start = "┍━{{{}}} ".format(index)
elif state == "debug":
start = "├┄ "
elif state == "end":
start = "┕━ "
# Add label surrounding and padding
label = label or ""
if label:
template = "[{}] "
label = template.format(label)
label = label.ljust(len(template.format("")) + self.max_label)
# Put message if any
message = message or ""
return "".join([start, label, message])
class JobTasks:
"""
Container for every available tasks
"""
TASK_DEFINITIONS = {
"capitalize": {
"method": "capitalize",
},
"lowercase": {
"method": "lowercase",
},
"uppercase": {
"method": "uppercase",
},
"replace": {
"method": "replace",
"options": ["from", "to"],
},
"add_prefix": {
"method": "add_prefix",
"options": ["prefix"],
},
"underscore_to_dash": {
"method": "convert_underscore_to_dash",
},
"numerate": {
"method": "numerate",
"options": ["start", "zfill"],
},
"remove_segment": {
"method": "remove_segment",
"options": ["divider", "slice_start"],
},
}
def remove_segment(self, index, basepath, source, **options):
"""
Remove a segment from splitted filename.
Filename is splitted on a divider and an item at given position is removed, it
won't never apply on file extensions. If filename have only one segment like
"foo.txt", nothing is removed. Joiner option can use a custom string to join
segments, default string if not given is "-".
TODO:
Opposed to its name, this does not remove target segment with last+end, but
instead it retain only segments from given start to end, segment which are
not in start+end bounds are lost.
Arguments:
index (integer): Index integer for current item in the walked list.
basepath (string): Directory path where to rename files.
source (string): Source filename.
**options (dict): Tasks options, require options "divider" and "slice_start" and
accept "slice_end" and "joiner"
Returns:
tuple: A tuple of current filename and new filename.
"""
divider = options["divider"]
joiner = options.get("joiner", "-")
slice_start = options["slice_start"]
slice_end = options.get("slice_end", None)
src = os.path.join(basepath, source)
# Split source to get distinct name from extensions
root, exts = os.path.splitext(source)
# Split name on divider
segments = root.split(divider)
# Only remove segment if there is more than one
if len(segments) > 1:
if slice_end:
segments = segments[slice_start:slice_end]
else:
segments = segments[slice_start:]
new_filename = joiner.join(segments) + exts
dst = os.path.join(basepath, new_filename)
if self.verbose:
print(
self._format_row(
index,
label="remove_segment",
message=new_filename,
state="debug",
)
)
return (source, new_filename)
def replace(self, index, basepath, source, **options):
"""
Replace all occurences of a string (from) by another one (to).
Just use the common String.replace() method. This is applied on the whole
filename, including the file extensions.
Arguments:
index (integer): Index integer for current item in the walked list.
basepath (string): Directory path where to rename files.
source (string): Source filename.
**options (dict): Tasks options, expect options "from" and "to".
Returns:
tuple: A tuple of current filename and new filename.
"""
src = os.path.join(basepath, source)
from_string = options["from"]
to_string = options["to"]
new_filename = source.replace(from_string, to_string)
dst = os.path.join(basepath, new_filename)
if self.verbose:
print(
self._format_row(
index,
label="replace",
message=new_filename,
state="debug",
)
)
return (source, new_filename)
def capitalize(self, index, basepath, source, **options):
"""
Capitalize file name (work on the whole filename including extension).
Arguments:
index (integer): Index integer for current item in the walked list.
basepath (string): Directory path where to rename files.
source (string): Source filename.
**options (dict): Tasks options, this filter does not expect any options.
Returns:
tuple: A tuple of current filename and new filename.
"""
src = os.path.join(basepath, source)
new_filename = source.capitalize()
dst = os.path.join(basepath, new_filename)
if self.verbose:
print(
self._format_row(
index,
label="capitalize",
message=new_filename,
state="debug",
)
)
return (source, new_filename)
def lowercase(self, index, basepath, source, **options):
"""
Lowercase file name(work on the whole filename including extension).
Arguments:
index (integer): Index integer for current item in the walked list.
basepath (string): Directory path where to rename files.
source (string): Source filename.
**options (dict): Tasks options, this filter does not expect any options.
Returns:
tuple: A tuple of current filename and new filename.
"""
src = os.path.join(basepath, source)
new_filename = source.lower()
dst = os.path.join(basepath, new_filename)
if self.verbose:
print(
self._format_row(
index,
label="lowercase",
message=new_filename,
state="debug",
)
)
return (source, new_filename)
def uppercase(self, index, basepath, source, **options):
"""
Uppercase file name(work on the whole filename including extension).
Arguments:
index (integer): Index integer for current item in the walked list.
basepath (string): Directory path where to rename files.
source (string): Source filename.
**options (dict): Tasks options, this filter does not expect any options.
Returns:
tuple: A tuple of current filename and new filename.
"""
src = os.path.join(basepath, source)
new_filename = source.upper()
dst = os.path.join(basepath, new_filename)
if self.verbose:
print(
self._format_row(
index,
label="uppercase",
message=new_filename,
state="debug",
)
)
return (source, new_filename)
def add_prefix(self, index, basepath, source, **options):
"""
Add a prefix before each file from given directory path.
Arguments:
index (integer): Index integer for current item in the walked list.
basepath (string): Directory path where to rename files.
source (string): Source filename.
**options (dict): Tasks options, expect an option "prefix".
Returns:
tuple: A tuple of current filename and new filename.
"""
prefix = options["prefix"]
src = os.path.join(basepath, source)
new_filename = prefix + source
dst = os.path.join(basepath, new_filename)
if self.verbose:
print(
self._format_row(
index,
label="add_prefix",
message=new_filename,
state="debug",
)
)
return (source, new_filename)
def numerate(self, index, basepath, source, **options):
"""
Add current index position of item in file list.
Index number is filled (from given zfill length) to the right with '0' and
divided from filename with given divider string.
Arguments:
index (integer): Index integer for current item in the walked list.
basepath (string): Directory path where to rename files.
source (string): Source filename.
**options (dict): Tasks options, expect options "start", "zfill" and optional
"divider".
Returns:
tuple: A tuple of current filename and new filename.
"""
start = options["start"]
zfill = options["zfill"]
divider = options.get("divider", "_")
src = os.path.join(basepath, source)
new_filename = "{index}{divider}{filename}".format(
index=str(index).zfill(zfill),
divider=divider,
filename=source,
)
dst = os.path.join(basepath, new_filename)
if self.verbose:
print(
self._format_row(
index,
label="numerate",
message=new_filename,
state="debug",
)
)
return (source, new_filename)
def convert_underscore_to_dash(self, index, basepath, source, **options):
"""
Convert every ".mp4" filenames from "/youdl/youdl/" with:
* "_" to "-";
* "---" to "_";
* So from "ping_-_foo_bar.mp4" convert to "ping_foo-bar.mp4".
Arguments:
index (integer): Index integer for current item in the walked list.
basepath (string): Directory path where to rename files.
source (string): Source filename.
**options (dict): Tasks options where method select the one it needs.
Returns:
tuple: A tuple of current filename and new filename.
"""
src = os.path.join(basepath, source)
new_filename = source.replace("_", "-").replace("---", "_")
dst = os.path.join(basepath, new_filename)
if self.verbose:
print(
self._format_row(
index,
label="underscore_to_dash",
message=new_filename,
state="debug",
)
)
return (source, new_filename)
class CommonRenameJob(AsciiOutputFormatter, JobTasks):
"""
Renaming job manager.
Attributes:
TASK_DEFINITIONS (dict): Job definitions, with their method name and required
options.
Keyword Arguments:
verbose (boolean): If enabled, task can print out some debug informations. This
is enabled by default.
dry_run (boolean): If enabled, task will perform its jobs without writing
anything on filesystem. This is enabled by default.
"""
def __init__(self, verbose=True, dry_run=True):
super().__init__()
self.verbose = verbose
self.dry_run = dry_run
self.reverse_store = []
def task_option_validate(self, tasks):
"""
Validate task options.
Arguments:
tasks (list): List of task names and their options to apply on each file,
order does matter.
Returns:
list: List of errors if any.
"""
errors = []
for name, options in tasks:
if name not in self.TASK_DEFINITIONS:
errors.append(
" 🔖 There is no task with name '{}'.".format(name)
)
continue
task_method = self.TASK_DEFINITIONS[name]["method"]
task_option_names = self.TASK_DEFINITIONS[name].get("options", [])
if len(task_option_names) > 0:
for name in task_option_names:
if name not in options:
errors.append(
" 🔖 Task '{}' require a '{}' option.".format(
task_method,
name
)
)
return errors
def walk_files(self, basepath, sort=True, reverse=False, extensions=None):
"""
Return list of elligible files from given directory path. This is not recursive.
Arguments:
basepath (string): Directory path where to rename files.
Keyword Arguments:
sort (boolean): If enabled apply basic alphabetical sorting on directory
listing.
reverse (boolean): If enabled the file list will be reversed (after sorting
if any). This is disabled by default.
extensions (list): File extensions to filter on. If empty list or None,
file list will not be filtered against any file extension. If
not empty, file list will be filtered so only file with one of given
extension will be used. Each extension has to be given without leading
dot.
"""
files = []
if self.verbose:
if self.dry_run:
print("• Dry run mode enabled, no file will be modified.")
else:
print("• Dry run mode disabled, files will be modified.")
msg = (extensions or ["All"])
print("• Allowed file extension:", ", ".join(msg))
for filename in os.listdir(basepath):
# Ignore file without allowed extension (if any)
ext = filename.split(".")[-1]
if extensions and not filename.endswith(tuple(extensions)):
continue
path = os.path.join(basepath, filename)
if os.path.isfile(path):
files.append(filename)
if sort:
files = sorted(files)
if reverse:
files = reversed(files)
return files
def perform_path_tasks(self, basepath, tasks, sort=True, reverse=False,
extensions=None):
"""
Perform many tasks on files from given directory path.
Arguments:
basepath (string): Directory path where to rename files.
tasks (list): List of task names and their options to apply on each file,
order does matter.
Keyword Arguments:
sort (boolean): If enabled apply basic alphabetical sorting on directory
listing.
reverse (boolean): If enabled the file list will be reversed (after sorting
if any). This is disabled by default.
extensions (list): File extensions to filter on.
"""
files = self.walk_files(
basepath,
sort=sort,
extensions=extensions,
reverse=reverse,
)
errors = self.task_option_validate(tasks)
if len(errors) > 0:
print("🚨 There is some errors:")
print("\n".join(errors))
return
# Alter enumerate start number from possible options
enumerate_start = 1
if "numerate" in [k for k, v in tasks]:
enumerate_start = dict(tasks)["numerate"]["start"]
self.max_label = self.max_label_length(tasks)
# Walk on files
for i, source in enumerate(files, start=enumerate_start):
source_path = os.path.join(basepath, source)
# Start destination from original filename
destination = source
# Print the starting row
print()
print(
self._format_row(
i,
message=source_path,
state="start",
)
)
# Apply every tasks on file
for name, options in tasks:
task_method = self.TASK_DEFINITIONS[name]["method"]
# Perform task
paths = getattr(self, task_method)(i, basepath, destination, **options)
# Update destination filename from returned task value
destination = paths[1]
dest_path = os.path.join(basepath, destination)
print(self._format_row(
i, message="♻️FROM: {}".format(source)
))
if dest_path != source_path:
if os.path.exists(dest_path):
print(self._format_row(
i,
message="🚨 Destination already exists: {}".format(destination),
state="end",
))
elif self.dry_run:
print(self._format_row(
i, message="✨ DEST: {}".format(destination), state="end",
))
else:
print(self._format_row(
i, message="✅ DEST: {}".format(destination), state="end",
))
self.reverse_store.append(
(source_path, dest_path)
)
# Perform renaming
os.rename(source_path, dest_path)
else:
print(self._format_row(
i,
message=(
"🚨 Source and destination paths are identical, there is "
"nothing to rename."
),
state="end",
))
return
if __name__ == '__main__':
import argparse
class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter,
argparse.RawDescriptionHelpFormatter):
"""
Implement some argparse formatters
"""
pass
parser = argparse.ArgumentParser(
description=(
"Rename multiple files from a directories with JSON job configuration files."
"\n\n"
"Default behavior is to not change anything, just list task it would do and"
"\nyou need to enable commit with the '--commit' argument."
"\n\n"
"You can't mix job and init modes and so mix Job filepath and directory "
"paths."
"\n\n"
"Finally, if any job task fails all renamed file will be reversed to their "
"original filename."
),
formatter_class=CustomFormatter,
)
# Add arguments and options
parser.add_argument(
"paths",
type=str,
metavar="PATH",
nargs="+",
help=(
"JSON Job configuration filepath or directory path if 'init' mode is on. "
"Already existing config will not be overwritten."
)
)
parser.add_argument(
"--init",
default=False,
action="store_true",
help=(
"Switch mode to create an empty Job configuration for each given basepath. "
"Created Job configuration will just have the 'basepath' value set to "
"given directory path as an absolute path."
),
)
parser.add_argument(
"--commit",
default=False,
action="store_true",
help=(
"Commit job, this is disabled by default to ensure you are testing jobs "
"before commiting anything on files."
),
)
parser.add_argument(
"--no-verbose",
default=False,
action="store_true",
help=(
"Disable verbosity, this is enabled by default to help debugging jobs "
"before commiting."
),
)
args = parser.parse_args()
if args.init:
init_job_configs(args.paths)
else:
# Parse and validate given job configs
errors = validate_job_files(args.paths)
if len(errors) > 0:
print("\n".join(errors))
else:
# Perform all jobs
try:
# Initialize renamer
renamer = CommonRenameJob(
dry_run=not(args.commit),
verbose=not(args.no_verbose),
)
# Run jobs
for path in args.paths:
with open(path, "r") as fp:
parameters = json.load(fp)
name = parameters.get("name", None) or "Job from '{}'".format(path)
tasks = parameters.get("tasks", None)
extensions = parameters.get("extensions", None)
basepath = parameters.get("basepath", None)
reverse = parameters.get("reversed", False)
print()
print("➖" * 30)
print(name)
print("➖" * 30)
renamer.perform_path_tasks(
basepath,
tasks=tasks,
extensions=extensions,
reverse=reverse,
)
except:
# Restore every renamed files before error
print()
print(
(
"🚨 An error occured during a job, every files will be "
"restored with their original filename."
)
)
print()
# Restore renamed file to their original filenames
for src, dst in renamer.reverse_store:
print("*", dst, "=>", src)
os.rename(dst, src)
print()
# Finally raise the original error
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment