Instantly share code, notes, and snippets.
Last active
July 1, 2020 18:57
-
Save 2ndBillingCycle/e49be623a38284f3b531d8b680ef6b4f to your computer and use it in GitHub Desktop.
Used to find all of the lines edited by a list of users for naev/naev#638
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
author_names = """ | |
unavowed | |
HatlessAtlas | |
Cypher3c | |
brognam | |
EveColonyCommander | |
iwaschosen | |
geekt | |
""".strip() | |
import atexit | |
import operator | |
import re | |
import sys | |
from collections import Counter, defaultdict, deque | |
from dataclasses import dataclass, field | |
from itertools import chain, compress, groupby, product, repeat, starmap | |
from pathlib import Path | |
from subprocess import PIPE, run | |
from tempfile import NamedTemporaryFile | |
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Union | |
from more_itertools import windowed | |
log_re = re.compile(r"^(?P<author_name>.*?) <(?P<author_email>.*?)>") | |
def shell(cmd: str, check: bool = True) -> str: | |
return run(cmd, stdout=PIPE, stderr=PIPE, shell=True, check=check).stdout.decode( | |
"utf-8" | |
) | |
@dataclass | |
class Author: | |
name: str | |
alts: Set[str] = field(default_factory=set) | |
emails: Set[str] = field(default_factory=set) | |
lines: Dict[Path, Set[str]] = field(default_factory=lambda: defaultdict(set)) | |
def is_alias(self, ident: str) -> bool: | |
is_substring: Callable[[str], bool] = lambda s: s in ident or ident in s | |
return any(is_substring(s) for s in self.alts) or any( | |
is_substring(s) for s in self.emails | |
) | |
Authors = Dict[str, Author] | |
authors: Authors = { | |
name: Author(name=name, alts={name}) for name in author_names.splitlines() | |
} | |
author_queue = deque(authors) | |
while author_queue: | |
ident = author_queue.pop() | |
current_idents = list( | |
chain.from_iterable( | |
[*author.alts, *author.emails] for author in authors.values() | |
) | |
) | |
log_output = set( | |
shell( | |
f'git log --regexp-ignore-case --full-history --pretty="tformat:%aN <%aE>" --author "{ident}"' | |
).splitlines() | |
) | |
for i, line in enumerate(log_output): | |
match = log_re.match(line) | |
if not match: | |
raise ValueError(f"log output line #{i} not understood:\n{log_output}") | |
author_name = match.group("author_name") | |
author_email = match.group("author_email") | |
found = False | |
for author in authors.values(): | |
if author.is_alias(author_name) or author.is_alias(author_email): | |
found = True | |
break | |
if not found: | |
raise ValueError(f"no author matching:\n{match}\n{authors}") | |
author.alts.add(author_name) | |
author.emails.add(author_email) | |
if author_name not in current_idents: | |
author_queue.append(author_name) | |
if author_email not in current_idents: | |
author_queue.append(author_email) | |
author_emails = set(chain.from_iterable([author.emails for author in authors.values()])) | |
emails_file = NamedTemporaryFile(mode="w") | |
atexit.register(emails_file.close) | |
emails_path = Path(emails_file.name) | |
emails_path.write_text("\n".join(author_emails)) | |
git_files = [Path(path) for path in shell("git ls-files").splitlines()] | |
if not all(path.exists() for path in git_files): | |
raise FileNotFoundError(f"Somehow missing a file:\n{git_files}") | |
ignored_suffixes = [".ttf", ".ogg", ".wav", ".png"] | |
text_files = [path for path in git_files if path.suffix not in ignored_suffixes] | |
SPath = Union[str, Path] | |
git_blame: Callable[[SPath], str] = lambda path: shell( | |
f"""git blame -e -w {path} | \ | |
grep --directories=skip --fixed-strings --file='{emails_path}'""", | |
check=False, | |
) | |
deep_blame: Callable[[SPath], str] = lambda path: shell( | |
f"git blame -e -w -M -C -C {path}" | |
) | |
blame_re = re.compile( | |
r"^(?P<sha>\w+)\s+((?P<path>.*?)\s+)?\(<(?P<email>.*?)>\s+((?P<other_path>.*?)\s+)?(?P<date>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+[+-]\d{4})\s+(?P<line_number>\d+)\)\s(?P<line>.*)$" | |
) | |
def email_to_author(email: str) -> Author: | |
for author in authors.values(): | |
if email in author.emails: | |
return author | |
raise KeyError(f"'{email}' not in {authors}") | |
for path, blame in ((path, git_blame(path)) for path in text_files): | |
if not blame: | |
continue | |
matches = [blame_re.match(line) for line in deep_blame(path).splitlines()] | |
if len(matches) and not all(matches): | |
bad_lines = ", ".join( | |
[str(index) for index, match in enumerate(matches) if not match] | |
) | |
print(f"Can't match file {path}: lines {bad_lines}") | |
blame_dicts = [ | |
match.groupdict() | |
for match in matches | |
if match and match.group("email") in author_emails | |
] | |
for author_dict, blame_dict in ( | |
(email_to_author(blame_dict["email"]), blame_dict) for blame_dict in blame_dicts | |
): | |
author_dict.lines[path].add(blame_dict["line_number"]) | |
def merge_ranges(nums: List[Union[str, int]]) -> List[str]: | |
if len(nums) < 2: | |
return [str(num) for num in nums] | |
sorted_nums: List[int] = sorted( | |
map(lambda num: int(num) if isinstance(num, str) else num, nums) | |
) | |
deltas: Iterator[int] = chain(starmap(operator.sub, windowed(sorted_nums, 2)), [0]) | |
next_d: Callable[[Any], int] = lambda x: next(deltas) | |
groups = deque((k, list(g)) for k, g in groupby(sorted_nums, next_d)) | |
ranges: List[str] = list() | |
skip_next = False | |
for i, (delta_with_next, items) in enumerate(groups): | |
if skip_next: | |
skip_next = False | |
continue | |
if delta_with_next == (-1): | |
ranges.append(f"{items[0]} - {groups[i+1][1][0]}") | |
skip_next = True | |
else: | |
ranges.extend(str(item) for item in items) | |
return ranges | |
base_url = "https://github.com/naev/naev/blame/ac520e35eea287fb2b9589e99889c4f7c7e8804e/" | |
range_re = re.compile(r"^(?P<start>\d+)( - (?P<end>\d+))?$") | |
def markdown_line_links( | |
ranges: List[str], file: SPath, base_url: str = base_url | |
) -> List[str]: | |
path = Path(file) | |
matches = [range_re.match(range) for range in ranges] | |
if not all(matches): | |
raise ValueError( | |
f"Didn't match: {list(compress(ranges, (not match for match in matches)))}" | |
) | |
links: List[str] = list() | |
# "if match" to convince mypy that no None is in matches | |
for match in (match.groupdict() for match in matches if match): | |
start = match["start"] | |
end = match.get("end", start) | |
if not end: | |
links.append(f"[{start}]({base_url + str(path)}#L{start})") | |
else: | |
links.append(f"[{start}-{end}]({base_url + str(path)}#L{start}-L{end})") | |
return links | |
# breakpoint() | |
for author in authors.values(): | |
edits = "\n".join( | |
f" - [{path}]({base_url+str(path)})\n - " | |
+ ", ".join(markdown_line_links(merge_ranges(list(ranges)), path)) | |
for path, ranges in author.lines.items() | |
) | |
print(f"- {author.name}\n{edits}\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Feel free to use this, but I suggest not to.
git filter-repo
is a much better project, especially if you're using Python.