Last active
February 15, 2023 23:55
-
-
Save BradenM/33be29fb13e69fdc9e471b70504895db to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from thefuzz import fuzz, process | |
from collections import defaultdict | |
import shutil | |
from types import MappingProxyType | |
from typing import Iterable, Iterator, Mapping | |
import sh | |
from contextlib import suppress | |
from enum import Enum | |
from rich import print | |
from loguru import logger | |
from attrs import define, Factory, field, frozen, converters | |
from git.repo import Repo | |
from tempfile import mkdtemp | |
from pathlib import Path | |
WORK_DIR = Path(mkdtemp()) / "ccu-4-backport" | |
WORK_DIR.mkdir(parents=True, exist_ok=True) | |
class StrEnum(str, Enum): | |
def __str__(self) -> str: | |
return self.value | |
class WorkRepo(StrEnum): | |
V3 = "crisiscleanup-3-web" | |
V4 = "crisiscleanup-4-web" | |
@property | |
def work_path(self) -> Path: | |
return WORK_DIR / self.name.lower() | |
@property | |
def remote_url(self) -> str: | |
return f"git@github.com:CrisisCleanup/{self.value}.git" | |
@property | |
def git(self) -> sh.Command: | |
return sh.git.bake(_cwd=str(self.work_path), no_pager=True) | |
def clone_repo(self, to_path: Path | None = None) -> Repo: | |
to_path = to_path or self.work_path | |
logger.info("cloning ({}) @ {} -> {}", self, self.remote_url, to_path) | |
return Repo.clone_from(self.remote_url, to_path) | |
def convert_paths(values: Iterable[str]) -> list[Path]: | |
return [Path(p) for p in values] | |
frozen_path_convert = converters.pipe(convert_paths, frozenset) | |
@frozen | |
class TreeSet: | |
paths: frozenset[Path] = field(converter=frozen_path_convert) | |
dirs: frozenset[Path] = field(converter=frozen_path_convert) | |
files: frozenset[Path] = field(converter=frozen_path_convert) | |
@classmethod | |
def from_git(cls, git: sh.Command) -> "TreeSet": | |
paths = set(str(git("ls-files", _nl=False)).splitlines()) | |
dirs = set() | |
for p in paths: | |
dirs |= set(Path(p).parents) | |
return cls(paths=paths | dirs, dirs=dirs, files=paths) | |
@property | |
def root_files(self) -> frozenset[Path]: | |
return frozenset((i for i in self.files if len(i.parents) == 1)) | |
@define | |
class Context: | |
js_files: set[Path] = field() | |
file_map: dict[Path, tuple[Path, str]] = Factory(dict) | |
@js_files.default | |
def _js_files(cls): | |
return set( | |
( | |
p.relative_to(WorkRepo.V3.work_path) | |
for p in WorkRepo.V3.work_path.rglob("*.js") | |
) | |
) | |
@property | |
def stem_map(self) -> MappingProxyType[str, Path]: | |
return MappingProxyType({p.stem: p for p in self.js_files}) | |
def find(self, ts_file: Path) -> tuple[Path | None, Path]: | |
js_file: Path | None = None | |
match_type: str | None = None | |
if (js_file := ts_file.with_suffix(".js")) in self.js_files: | |
logger.info("rename [{}]: ({} ~> {})", "EXACT", js_file, ts_file) | |
match_type = "EXACT" | |
elif ts_file.stem in self.stem_map: | |
if result := process.extractOne( | |
str(ts_file.with_suffix(".js")), | |
self.js_files, | |
scorer=fuzz.token_sort_ratio, | |
score_cutoff=90, | |
): | |
if result[0].stem == ts_file.stem: | |
js_file, score = result | |
match_type = f"FUZZ@{score}" | |
if js_file and (WorkRepo.V3.work_path / js_file).exists(): | |
logger.info("rename [{}]: ({} ~> {})", match_type, js_file, ts_file) | |
self.file_map[js_file] = (ts_file, match_type) | |
self.js_files -= {js_file} | |
return js_file, ts_file | |
return None, ts_file | |
def names_in_rev(git: sh.Command, rev: str) -> Mapping[str, set[Path]]: | |
res = git.show(rev, name_only=True, format="") | |
suffix_map = defaultdict(set) | |
for p in str(res).splitlines(): | |
path = Path(p) | |
suffix_map[path.suffix].add(path) | |
return suffix_map | |
def pick_rev(git: sh.Command, rev: str): | |
cherry_pick = git.bake( | |
"cherry-pick", | |
x=True, | |
allow_empty=True, | |
keep_redundant_commits=True, | |
m=1, | |
X="theirs", | |
) | |
logger.info("picking (rev={})", rev) | |
cherry_pick(rev) | |
def backport_rev(git: sh.Command, context: Context, rev: str): | |
rev_files = names_in_rev(git, rev) | |
if not any(rev_files[".ts"]): | |
logger.info("no .ts changes made in rev ({}), skipping.", rev) | |
return | |
for file in rev_files[".ts"]: | |
js_file, ts_file = context.find(file) | |
if js_file is None: | |
continue | |
logger.info("removing ({}@{})", js_file, ts_file) | |
git.rm(str(js_file), _fg=True) | |
git.commit(amend=True, a=True, no_edit=True, reuse_message=rev, _fg=True) | |
def cleanup_files(): | |
v3_ts = TreeSet.from_git(WorkRepo.V3.git) | |
v4_ts = TreeSet.from_git(WorkRepo.V4.git) | |
root_diff = v3_ts.root_files - v4_ts.root_files | |
print("Root files diff:") | |
print(root_diff) | |
dirs_diff = v3_ts.dirs - v4_ts.dirs | |
print("Dirs diff:") | |
print(dirs_diff) | |
def has(ext: str, dirs: frozenset[Path], *, none: bool = False) -> Iterator[Path]: | |
for d in dirs: | |
path = WorkRepo.V3.work_path / d | |
files = path.glob(f"*.{ext}") | |
res = next(files, None) | |
if none and res is None: | |
yield d | |
if not none and res is not None: | |
yield d | |
dirs_no_ts = set(has("ts", dirs_diff, none=True)) | |
dirs_no_vue = set(has("vue", dirs_diff, none=True)) | |
dirs_has_js = set(has("js", dirs_diff)) | |
print(dict(no_ts=dirs_no_ts, no_vue=dirs_no_vue, js=dirs_has_js)) | |
dirs_no = dirs_no_ts & dirs_no_vue & dirs_has_js | |
print(f"Likely delete dirs ({len(dirs_no)}/{len(dirs_diff)}):") | |
print(dirs_no) | |
preserves = {"mock", "test", "stories", "storybook", "internal", "LICENSE", "spec"} | |
remove_paths = dirs_no | root_diff | |
for path in remove_paths: | |
work_path = WorkRepo.V3.work_path / path | |
if work_path.exists() and not any((i for i in preserves if i in str(path))): | |
logger.info("removing ({})", path) | |
WorkRepo.V3.git.rm(str(path), _fg=True, r=True) | |
WorkRepo.V3.git.commit( | |
a=True, no_edit=True, message="chore: remove likely unused paths." | |
) | |
def prepare_workspaces(): | |
logger.info("using working dir: {}", WORK_DIR) | |
with suppress(FileNotFoundError): | |
shutil.rmtree(WorkRepo.V3.work_path) | |
shutil.rmtree(WorkRepo.V4.work_path) | |
WorkRepo.V3.clone_repo() | |
WorkRepo.V4.clone_repo() | |
WorkRepo.V3.git.remote.add("v4", str(WorkRepo.V4.work_path)) | |
WorkRepo.V3.git.fetch(all=True, _fg=True) | |
def get_revs(): | |
revs = str( | |
WorkRepo.V4.git("rev-list", "master", reverse=True, _nl=False) | |
).splitlines() | |
logger.info("revs ({})", revs) | |
return revs | |
def main(): | |
prepare_workspaces() | |
revs = get_revs() | |
ctx = Context() | |
for rev in revs: | |
pick_rev(WorkRepo.V3.git, rev) | |
backport_rev(WorkRepo.V3.git, ctx, rev) | |
cleanup_files() | |
print("Renames:", ctx.file_map) | |
logger.success("Done!") | |
logger.info("Results directory: {}", WORK_DIR) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment