-
-
Save budavariam/e946ecde50dee226ed2ec218e0c497d2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import path | |
import subprocess | |
import re | |
EMAIL_PATTERN = re.compile(r'.*<(.*)>') | |
# git env variables | |
# https://git-scm.com/book/en/v2/Git-Internals-Environment-Variables#_committing | |
FIELD_AUTHOR_EMAIL= 'author_email' | |
FIELD_AUTHOR_NAME= 'author_name' | |
FIELD_AUTHOR_DATE= 'author_date' | |
FIELD_COMMITTER_EMAIL= 'committer_email' | |
FIELD_COMMITTER_NAME= 'committer_name' | |
FIELD_COMMITTER_DATE= 'committer_date' | |
# custom field for development | |
FIELD_CUSTOM_MESSAGE= 'message' | |
def get_email_addresses(repo_location, project_name): | |
args = ["git", "--no-pager", "shortlog", "--summary", "--numbered", "--email"] | |
print(f"Run cmd for get_email_addresses: {' '.join(args)}") | |
email_cmd = subprocess.Popen( | |
args=args, | |
stdout=subprocess.PIPE, | |
cwd=repo_location | |
) | |
emails = [] | |
for i, line in enumerate(email_cmd.stdout.readlines()): | |
match = re.match(EMAIL_PATTERN, str(line.strip())) | |
email = match.group(1) | |
emails.append(email) | |
print(f"{i} - {email}") | |
email_list = input("Please mark the email addresses you'd like to sync, and add their numbers as a space separated list, use (-) to skip this project...\n") | |
if email_list == '-': | |
print(f"Skipping {project_name}...") | |
return set([]) | |
filtered_emails = set([emails[int(x)] for x in email_list.split(" ") if x != '']) | |
return filtered_emails | |
def get_filtered_commit_history(repo_location, project_name, emails, filter_by_field = FIELD_AUTHOR_EMAIL): | |
fields = [ | |
{"fmt": r"%ae", "title": FIELD_AUTHOR_EMAIL}, | |
{"fmt": r"%an", "title": FIELD_AUTHOR_NAME}, | |
{"fmt": r"%ad", "title": FIELD_AUTHOR_DATE}, | |
{"fmt": r"%ce", "title": FIELD_COMMITTER_EMAIL}, | |
{"fmt": r"%cn", "title": FIELD_COMMITTER_NAME}, | |
{"fmt": r"%cd", "title": FIELD_COMMITTER_DATE}, | |
# message to display in the commit. | |
# I suggest NOT to use the original message it can be confidental or can contain special characters | |
# {"fmt": "\"%s\"", "title": FIELD_CUSTOM_MESSAGE}, | |
# {"fmt": f"\"---\"", "title": FIELD_CUSTOM_MESSAGE}, | |
{"fmt": f"{project_name}", "title": FIELD_CUSTOM_MESSAGE}, | |
] | |
filter_index = [x.get("title") for x in fields].index(filter_by_field) | |
format_str = ';'.join([field.get("fmt") for field in fields]) | |
args = [ | |
"git", | |
"--no-pager", # to get stdout more easily | |
"log", | |
"--all", # get data from all branches | |
"--date=default", # keep committer/author's original timestamp | |
f"--pretty=format:{format_str}", # format predefined way | |
"--reverse", # in order to add them in increasing order | |
] | |
print(f"Run cmd for get_filtered_commit_history: {' '.join(args)}") | |
logcmd = subprocess.Popen( | |
args=args, | |
stdout=subprocess.PIPE, | |
cwd=repo_location | |
) | |
commits = [] | |
for i, line in enumerate(logcmd.stdout.readlines()): | |
raw_commit = str(line.strip(), 'UTF8').split(";") | |
if (raw_commit[filter_index] in emails): | |
commits.append({ | |
value.get("title"): raw_commit[i] for i, value in enumerate(fields) | |
}) | |
return commits | |
def create_commits(repo_location, commits, file_name = 'version.txt'): | |
for i, commit in enumerate(commits): | |
with open(path.join(repo_location, file_name), 'w') as wf: | |
wf.write(str(commit.get(FIELD_AUTHOR_DATE))) | |
logcmd = subprocess.run( | |
args=["git", "add", "."], | |
cwd=repo_location | |
) | |
commitcmd = subprocess.run([ | |
"/usr/bin/env", | |
# these env variables must be set for the proper history | |
f"GIT_AUTHOR_NAME={commit.get(FIELD_AUTHOR_EMAIL)}", | |
f"GIT_AUTHOR_EMAIL={commit.get(FIELD_AUTHOR_NAME)}", | |
f"GIT_AUTHOR_DATE={commit.get(FIELD_AUTHOR_DATE)}", | |
f"GIT_COMMITTER_NAME={commit.get(FIELD_COMMITTER_EMAIL)}", | |
f"GIT_COMMITTER_EMAIL={commit.get(FIELD_COMMITTER_NAME)}", | |
f"GIT_COMMITTER_DATE={commit.get(FIELD_COMMITTER_DATE)}", | |
"git", "commit", "--quiet", "-m", f"{commit.get(FIELD_CUSTOM_MESSAGE)}" | |
], cwd=repo_location) | |
if (commitcmd.returncode != 0): | |
print(f"Failed to create commit ({i}): {commit}") | |
if (i%100 == 0 ): | |
print(f"{i}/{len(commits)} done") | |
print("Commits created") | |
def prepare_repo(repo_location, branch_name): | |
args = ["git", "init", "--quiet"] | |
print(f"Run cmd for prepare_repo: {' '.join(args)}") | |
logcmd = subprocess.Popen( | |
args=args, | |
cwd=repo_location | |
) | |
args = ["git", "checkout", "--quiet", "-b", branch_name] | |
print(f"Run cmd for prepare_repo: {' '.join(args)}") | |
logcmd = subprocess.Popen( | |
args=args, | |
cwd=repo_location | |
) | |
def push_to_remote(repo_location, branch_name='main'): | |
push_cmd = subprocess.run([ | |
"git", "push", "--set-upstream", "origin", branch_name | |
], cwd=repo_location) | |
print(push_cmd.stdout) | |
def process_repo(project_name, src_repo_location, tgt_repo_location, tgt_branch_name): | |
print( | |
"\n".join([ | |
f"Now we're going to create fake commits for {project_name}", | |
f"from: {src_repo_location}", | |
f"into: {tgt_repo_location} on branch {tgt_branch_name}", | |
"", | |
"First I'd like you to select which email addresses belong to you, then I'll create the commits in the prepared folder", | |
]) | |
) | |
emails = get_email_addresses(src_repo_location, project_name) | |
if len(emails) == 0: | |
return | |
print(f"Selected emails: {emails}") | |
commits = get_filtered_commit_history(src_repo_location, project_name, emails) | |
print(f"Preparing repo...") | |
prepare_repo(tgt_repo_location, tgt_branch_name) | |
print(f"Creating {len(commits)} commits...") | |
create_commits(tgt_repo_location, commits) | |
# push_to_remote(tgt_repo_location, tgt_branch_name) | |
def main(): | |
projects = [ | |
{"project_name": 'project1', "src": '/projects/client1/project1'}, | |
] | |
tgt_repo_location='/projects/keep-history' | |
for proj in projects: | |
project_name=proj.get("project_name") | |
src_repo_location=proj.get("src") | |
tgt_branch_name=project_name | |
try: | |
process_repo( | |
project_name=project_name, | |
src_repo_location=src_repo_location, | |
tgt_repo_location=tgt_repo_location, | |
tgt_branch_name=tgt_branch_name, | |
) | |
except Exception as ex: | |
print(f"Failed to process {project_name}, exception: {str(ex)}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment