-
-
Save CasperTheCat/669d0e408e7ccc5e2b92c1338eade789 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Sourced: https://gist.github.com/CasperTheCat/669d0e408e7ccc5e2b92c1338eade789 | |
import json | |
import subprocess | |
def GetLockedFiles(): | |
SubProcess = subprocess.run(["git", "lfs", "locks", "--verify", "--json"], capture_output=True) | |
if SubProcess.returncode == 0: | |
LockJSON = SubProcess.stdout.decode().lstrip().rstrip() | |
return True, json.loads(LockJSON) | |
else: | |
return False, None | |
def GitPush(Remote, Branch): | |
SubProcess = subprocess.run(["git", "push", "{}".format(Remote), "{}".format(Branch)], capture_output=True) | |
if SubProcess.returncode == 0: | |
LockJSON = SubProcess.stdout.decode().lstrip().rstrip() | |
return True, LockJSON | |
else: | |
return False, None | |
def GetBranchName(): | |
SubProcess = subprocess.run(["git", "branch", "--show-current"], capture_output=True) | |
if SubProcess.returncode == 0: | |
BranchName = SubProcess.stdout | |
return True, BranchName.decode().lstrip().rstrip() | |
else: | |
return False, None | |
def GetTrackedRemote(): | |
SubProcess = subprocess.run(["git", "rev-parse", "--symbolic-full-name", "--abbrev-ref", "@{u}"], capture_output=True) | |
if SubProcess.returncode == 0: | |
RemoteName = SubProcess.stdout | |
return True, RemoteName.decode().lstrip().rstrip() | |
else: | |
return False, None | |
def GetUnpushedCommits(Remote, Branch): | |
SubProcess = subprocess.run(["git", "cherry", "{}/{}".format(Remote, Branch)], capture_output=True) | |
if SubProcess.returncode == 0: | |
Commits = SubProcess.stdout.split(b"\n") | |
CommitList = [Commit.decode()[2:] for Commit in Commits if len(Commit) > 0] | |
return True, CommitList | |
else: | |
return False, [] | |
def GetModifiedFilesForCommit(CommitHash): | |
print(CommitHash) | |
SubProcess = subprocess.run(["git", "diff-tree", "--no-commit-id", "--name-only", "{}".format(CommitHash), "-r"], capture_output=True) | |
if SubProcess.returncode == 0: | |
Commits = SubProcess.stdout.split(b"\n") | |
CommitList = [Commit.decode() for Commit in Commits if len(Commit) > 0] | |
return True, CommitList | |
else: | |
return False, [] | |
def ParseRemoteAndBranchFromTarget(Target): | |
Parts = Target.split("/") | |
if Parts[0] == "ref": | |
# Ignore | |
return True, Parts[1], Parts[2] | |
elif len(Parts) == 2: | |
return True, Parts[0], Parts[1] | |
else: | |
return False, None, None | |
# First, we're going to get the list of all things we are pushing | |
Success, BranchName = GetBranchName() | |
if not Success: | |
exit(-1) | |
Success, CommitTarget = GetTrackedRemote() | |
if not Success: | |
exit(-2) | |
Success, RemoteName, RemoteBranch = ParseRemoteAndBranchFromTarget(CommitTarget) | |
if not Success: | |
exit(-3) | |
# Debugging | |
print("We are on branch: {}. We track {}/{}".format(BranchName, RemoteName, RemoteBranch)) | |
# Get Unpushed Commits | |
Success, Commits = GetUnpushedCommits(RemoteName, RemoteBranch) | |
if not Success: | |
exit(-4) | |
ModifiedFiles = [] | |
if len(Commits) == 0: | |
print("We are already up to date.")# Running 'git push' anyway, but we aren't going to unlock anything.") | |
#GitPush(RemoteName, RemoteBranch) | |
exit(0) | |
else: | |
print("Sorting Commits for ModFileList") | |
for Commit in Commits: | |
CommitSuccess, ModifiedList = GetModifiedFilesForCommit(Commit) | |
if CommitSuccess: | |
ModifiedFiles += ModifiedList | |
# Reset to Single | |
ModifiedFiles = list(set(ModifiedFiles)) | |
print(ModifiedFiles) | |
IndexThreshold = 100 | |
Success, LockJsonData = GetLockedFiles() | |
if Success and "ours" in LockJsonData: | |
UnlockableItems = LockJsonData["ours"] | |
ProcessedUnlocks = [] | |
for Item in UnlockableItems: | |
ProcessedUnlocks.append(Item["path"]) | |
print("Found {} locks held locally".format(len(ProcessedUnlocks))) | |
Waits = [] | |
PushedUnlocks = [] | |
for Item in ProcessedUnlocks: | |
if Item in ModifiedFiles: | |
PushedUnlocks.append(Item) | |
print("Unlocking {} files after push".format(len(PushedUnlocks))) | |
for Item in PushedUnlocks: | |
print("\tUnlocking {}".format(Item)) | |
PushSuccess, _ = GitPush(RemoteName, RemoteBranch) | |
if PushSuccess: | |
NumBatches = ((len(PushedUnlocks) - 1) // IndexThreshold) + 1 | |
for i in range(NumBatches): | |
LowerBound = i * IndexThreshold | |
UpperBound = min((i + 1) * IndexThreshold, len(PushedUnlocks)) | |
print(' '.join(["git", "lfs", "unlock"] + PushedUnlocks[LowerBound:UpperBound])) | |
subproc = subprocess.Popen(["git", "lfs", "unlock"] + PushedUnlocks[LowerBound:UpperBound]) | |
Waits.append(subproc) | |
for Wait in Waits: | |
Wait.wait() | |
else: | |
print("Push failed, aborting unlock") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment