Skip to content

Instantly share code, notes, and snippets.

@BradenM
Last active February 15, 2023 23:55
Show Gist options
  • Save BradenM/33be29fb13e69fdc9e471b70504895db to your computer and use it in GitHub Desktop.
Save BradenM/33be29fb13e69fdc9e471b70504895db to your computer and use it in GitHub Desktop.
from __future__ import annotations
from thefuzz import fuzz, process
from collections import defaultdict
import shutil
from types import MappingProxyType
from typing import Iterable, Iterator, Mapping
import sh
from contextlib import suppress
from enum import Enum
from rich import print
from loguru import logger
from attrs import define, Factory, field, frozen, converters
from git.repo import Repo
from tempfile import mkdtemp
from pathlib import Path
WORK_DIR = Path(mkdtemp()) / "ccu-4-backport"
WORK_DIR.mkdir(parents=True, exist_ok=True)
class StrEnum(str, Enum):
def __str__(self) -> str:
return self.value
class WorkRepo(StrEnum):
V3 = "crisiscleanup-3-web"
V4 = "crisiscleanup-4-web"
@property
def work_path(self) -> Path:
return WORK_DIR / self.name.lower()
@property
def remote_url(self) -> str:
return f"git@github.com:CrisisCleanup/{self.value}.git"
@property
def git(self) -> sh.Command:
return sh.git.bake(_cwd=str(self.work_path), no_pager=True)
def clone_repo(self, to_path: Path | None = None) -> Repo:
to_path = to_path or self.work_path
logger.info("cloning ({}) @ {} -> {}", self, self.remote_url, to_path)
return Repo.clone_from(self.remote_url, to_path)
def convert_paths(values: Iterable[str]) -> list[Path]:
return [Path(p) for p in values]
frozen_path_convert = converters.pipe(convert_paths, frozenset)
@frozen
class TreeSet:
paths: frozenset[Path] = field(converter=frozen_path_convert)
dirs: frozenset[Path] = field(converter=frozen_path_convert)
files: frozenset[Path] = field(converter=frozen_path_convert)
@classmethod
def from_git(cls, git: sh.Command) -> "TreeSet":
paths = set(str(git("ls-files", _nl=False)).splitlines())
dirs = set()
for p in paths:
dirs |= set(Path(p).parents)
return cls(paths=paths | dirs, dirs=dirs, files=paths)
@property
def root_files(self) -> frozenset[Path]:
return frozenset((i for i in self.files if len(i.parents) == 1))
@define
class Context:
js_files: set[Path] = field()
file_map: dict[Path, tuple[Path, str]] = Factory(dict)
@js_files.default
def _js_files(cls):
return set(
(
p.relative_to(WorkRepo.V3.work_path)
for p in WorkRepo.V3.work_path.rglob("*.js")
)
)
@property
def stem_map(self) -> MappingProxyType[str, Path]:
return MappingProxyType({p.stem: p for p in self.js_files})
def find(self, ts_file: Path) -> tuple[Path | None, Path]:
js_file: Path | None = None
match_type: str | None = None
if (js_file := ts_file.with_suffix(".js")) in self.js_files:
logger.info("rename [{}]: ({} ~> {})", "EXACT", js_file, ts_file)
match_type = "EXACT"
elif ts_file.stem in self.stem_map:
if result := process.extractOne(
str(ts_file.with_suffix(".js")),
self.js_files,
scorer=fuzz.token_sort_ratio,
score_cutoff=90,
):
if result[0].stem == ts_file.stem:
js_file, score = result
match_type = f"FUZZ@{score}"
if js_file and (WorkRepo.V3.work_path / js_file).exists():
logger.info("rename [{}]: ({} ~> {})", match_type, js_file, ts_file)
self.file_map[js_file] = (ts_file, match_type)
self.js_files -= {js_file}
return js_file, ts_file
return None, ts_file
def names_in_rev(git: sh.Command, rev: str) -> Mapping[str, set[Path]]:
res = git.show(rev, name_only=True, format="")
suffix_map = defaultdict(set)
for p in str(res).splitlines():
path = Path(p)
suffix_map[path.suffix].add(path)
return suffix_map
def pick_rev(git: sh.Command, rev: str):
cherry_pick = git.bake(
"cherry-pick",
x=True,
allow_empty=True,
keep_redundant_commits=True,
m=1,
X="theirs",
)
logger.info("picking (rev={})", rev)
cherry_pick(rev)
def backport_rev(git: sh.Command, context: Context, rev: str):
rev_files = names_in_rev(git, rev)
if not any(rev_files[".ts"]):
logger.info("no .ts changes made in rev ({}), skipping.", rev)
return
for file in rev_files[".ts"]:
js_file, ts_file = context.find(file)
if js_file is None:
continue
logger.info("removing ({}@{})", js_file, ts_file)
git.rm(str(js_file), _fg=True)
git.commit(amend=True, a=True, no_edit=True, reuse_message=rev, _fg=True)
def cleanup_files():
v3_ts = TreeSet.from_git(WorkRepo.V3.git)
v4_ts = TreeSet.from_git(WorkRepo.V4.git)
root_diff = v3_ts.root_files - v4_ts.root_files
print("Root files diff:")
print(root_diff)
dirs_diff = v3_ts.dirs - v4_ts.dirs
print("Dirs diff:")
print(dirs_diff)
def has(ext: str, dirs: frozenset[Path], *, none: bool = False) -> Iterator[Path]:
for d in dirs:
path = WorkRepo.V3.work_path / d
files = path.glob(f"*.{ext}")
res = next(files, None)
if none and res is None:
yield d
if not none and res is not None:
yield d
dirs_no_ts = set(has("ts", dirs_diff, none=True))
dirs_no_vue = set(has("vue", dirs_diff, none=True))
dirs_has_js = set(has("js", dirs_diff))
print(dict(no_ts=dirs_no_ts, no_vue=dirs_no_vue, js=dirs_has_js))
dirs_no = dirs_no_ts & dirs_no_vue & dirs_has_js
print(f"Likely delete dirs ({len(dirs_no)}/{len(dirs_diff)}):")
print(dirs_no)
preserves = {"mock", "test", "stories", "storybook", "internal", "LICENSE", "spec"}
remove_paths = dirs_no | root_diff
for path in remove_paths:
work_path = WorkRepo.V3.work_path / path
if work_path.exists() and not any((i for i in preserves if i in str(path))):
logger.info("removing ({})", path)
WorkRepo.V3.git.rm(str(path), _fg=True, r=True)
WorkRepo.V3.git.commit(
a=True, no_edit=True, message="chore: remove likely unused paths."
)
def prepare_workspaces():
logger.info("using working dir: {}", WORK_DIR)
with suppress(FileNotFoundError):
shutil.rmtree(WorkRepo.V3.work_path)
shutil.rmtree(WorkRepo.V4.work_path)
WorkRepo.V3.clone_repo()
WorkRepo.V4.clone_repo()
WorkRepo.V3.git.remote.add("v4", str(WorkRepo.V4.work_path))
WorkRepo.V3.git.fetch(all=True, _fg=True)
def get_revs():
revs = str(
WorkRepo.V4.git("rev-list", "master", reverse=True, _nl=False)
).splitlines()
logger.info("revs ({})", revs)
return revs
def main():
prepare_workspaces()
revs = get_revs()
ctx = Context()
for rev in revs:
pick_rev(WorkRepo.V3.git, rev)
backport_rev(WorkRepo.V3.git, ctx, rev)
cleanup_files()
print("Renames:", ctx.file_map)
logger.success("Done!")
logger.info("Results directory: {}", WORK_DIR)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment