Skip to content

Instantly share code, notes, and snippets.

@jeetsukumaran
Created March 29, 2020 17:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeetsukumaran/dd9c01c23bc97cb54f90429be25e00b0 to your computer and use it in GitHub Desktop.
Save jeetsukumaran/dd9c01c23bc97cb54f90429be25e00b0 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3
################################################################################
# Copyright 2020 Jeet Sukumaran
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
################################################################################
import sys
import os
import argparse
import pathlib
import glob
import datetime
class Colors(object):
"""
ANSI color codes
from:
https://gist.github.com/rene-d/9e584a7dd2935d0f461904b9f2950007
"""
BLACK = "\033[0;30m"
RED = "\033[0;31m"
GREEN = "\033[0;32m"
BROWN = "\033[0;33m"
BLUE = "\033[0;34m"
PURPLE = "\033[0;35m"
CYAN = "\033[0;36m"
LIGHTGRAY = "\033[0;37m"
DARKGRAY = "\033[1;30m"
LIGHTRED = "\033[1;31m"
LIGHTGREEN = "\033[1;32m"
YELLOW = "\033[1;33m"
LIGHTBLUE = "\033[1;34m"
LIGHTPURPLE = "\033[1;35m"
LIGHTCYAN = "\033[1;36m"
LIGHTWHITE = "\033[1;37m"
BOLD = "\033[1m"
FAINT = "\033[2m"
ITALIC = "\033[3m"
UNDERLINE = "\033[4m"
BLINK = "\033[5m"
NEGATIVE = "\033[7m"
CROSSED = "\033[9m"
END = "\033[0m"
# cancel SGR codes if we don't write to a terminal
if not __import__("sys").stdout.isatty():
for _ in dir():
if isinstance(_, str) and _[0] != "_":
locals()[_] = ""
else:
# set Windows console in VT mode
if __import__("platform").system() == "Windows":
kernel31 = __import__("ctypes").windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
del kernel32
class NoColors(object):
BLACK = ""
RED = ""
GREEN = ""
BROWN = ""
BLUE = ""
PURPLE = ""
CYAN = ""
LIGHTGRAY = ""
DARKGRAY = ""
LIGHTRED = ""
LIGHTGREEN = ""
YELLOW = ""
LIGHTBLUE = ""
LIGHTPURPLE = ""
LIGHTCYAN = ""
LIGHTWHITE = ""
BOLD = ""
FAINT = ""
ITALIC = ""
UNDERLINE = ""
BLINK = ""
NEGATIVE = ""
CROSSED = ""
END = ""
def timestamp():
timestr = datetime.datetime.now().strftime("%Y%m%d-%H%M%S%f")
return timestr
def logfilepath(idx=0):
if idx == 0:
disambiguator = ""
else:
disambiguator = f".{idx:03d}"
return "flattendirs.{}{}.log".format(timestamp(), disambiguator)
def compose_path_str(
path,
is_dir,
parent_color,
final_dir_color,
final_file_color,
end_color,
):
parents = path.parent
final = path.name
sep = os.sep
if is_dir:
final_color = final_dir_color
else:
final_color = final_file_color
return f"{parent_color}{parents}{sep}{end_color}{final_color}{final}{end_color}"
def main():
parser = argparse.ArgumentParser(description=None)
parser.add_argument("-d", "--max-depth",
action="store",
type=int,
default=None,
help="Restrict maximum depth to this number of levels.")
parser.add_argument("-n", "--dry-run", "--no-execute",
action="store_true",
default=False,
help="Do not actually make any changes.")
parser.add_argument("--no-colors",
action="store_true",
default=False,
help="Do not colorize messages.")
parser.add_argument("--no-log",
action="store_true",
default=False,
help="Do not log operations.")
parser.add_argument("--logfile",
metavar="LOGFILEPATH",
dest="logpath",
default=None,
help=f"Path to logfile.")
parser.add_argument("-q", "--quiet",
action="store_true",
default=False,
help="Do not report changes.")
args = parser.parse_args()
dest_path = pathlib.Path(".")
if args.max_depth:
glob_patterns = []
for depth in range(1, args.max_depth+1):
glob_patterns.append("/".join("*" * (depth+1)))
else:
glob_patterns = ["**/*"]
entries = []
for glob_pattern in glob_patterns:
entries.extend([pathlib.Path(f) for f in glob.glob(glob_pattern, recursive=True)])
entries = sorted(entries, key=lambda f: len(f.parents), reverse=True)
entries = [e for e in entries if e.parent != dest_path]
if args.no_colors:
colors = NoColors
else:
colors = Colors
move_color = colors.BOLD + colors.LIGHTGREEN
to_color = colors.GREEN
end_color = colors.END
remove_color = colors.LIGHTRED
colors_d = {
"parent_color": colors.BLUE,
"final_dir_color": colors.LIGHTBLUE,
"final_file_color": "",
"end_color": colors.END,
}
seen = set()
mv_log_table = []
for eidx, entry in enumerate(entries):
# for idx, res in enumerate(entries):
# print("{:<d}/{:d}: {}".format(eidx+1, len(entries), entry))
candidate_name = entry.name
new_path = dest_path / candidate_name
collision_idx = 0
is_collision = new_path.exists() or new_path in seen
while is_collision:
collision_idx += 1
candidate_name = "{}.{:03d}{}".format(entry.stem, collision_idx, entry.suffix)
new_path = dest_path / candidate_name
is_collision = new_path.exists() or new_path in seen
seen.add(new_path)
mv_log_table.append((entry, new_path))
if not args.quiet:
src_str = compose_path_str(
path=entry,
is_dir=entry.is_dir(),
**colors_d,
)
dest_str = compose_path_str(
path=new_path,
is_dir=entry.is_dir(),
**colors_d,
)
print(f"- {move_color}Moving:{end_color} {src_str}\n {to_color}To:{end_color} {dest_str}")
if not args.dry_run:
entry.rename(new_path)
rmdir_log_table = []
for p in dest_path.iterdir():
if not p.is_dir():
continue
contents = os.listdir(p)
if not contents:
if not args.quiet:
dest_str = compose_path_str(
path=p,
is_dir=True,
**colors_d,
)
print(f"- {remove_color}Removing empty directory:{end_color} {dest_str}")
if not args.dry_run:
rmdir_log_table.append(p)
p.rmdir()
if not args.no_log:
if args.logpath is not None:
logpath = pathlib.Path(args.logpath)
else:
disambiguator = 0
logpath = pathlib.Path(logfilepath(disambiguator))
while logpath.exists():
disambiguator += 1
logpath = pathlib.Path(logfilepath(disambiguator))
with open(logpath, "w") as dest:
dest.write("#! /bin/bash\n")
dest.write("set -e -o pipefail\n")
dest.write("\n## Following operations were carried out:\n\n")
for log_entry in mv_log_table:
dest.write("# mv {} {}\n".format(log_entry[0], log_entry[1]))
for log_entry in rmdir_log_table:
dest.write("# rmdir {}\n".format(log_entry))
dest.write("\n## Following operations reverse the flattening:\n\n")
for log_entry in reversed(rmdir_log_table):
dest.write("# mkdir -p {}\n".format(log_entry))
for log_entry in reversed(mv_log_table):
dest.write("# mv {} {}\n".format(log_entry[1], log_entry[0]))
print(f"- Log file written to: {logpath}")
if not mv_log_table and not rmdir_log_table:
print("- No suitable files/directories found to be flattened")
else:
print(f"- Completed: {len(mv_log_table)} filesystem entries moved")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment