Skip to content

Instantly share code, notes, and snippets.

@ikus060
Created February 17, 2023 12:36
Show Gist options
  • Save ikus060/c6d4f17d362f1bb577fda3c5149636f4 to your computer and use it in GitHub Desktop.
Save ikus060/c6d4f17d362f1bb577fda3c5149636f4 to your computer and use it in GitHub Desktop.
import gzip
import io
import os
import shutil
import subprocess
import sys
import glob
# rename all file and folders from latin1 to utf8
_GZIP = shutil.which('gzip')
class WrapClose:
"""
Helper for _open() -- a proxy for a file whose close waits for the process.
"""
def __init__(self, stream, proc):
self._stream = stream
self._proc = proc
def close(self):
if self._proc.stdin:
self._proc.stdin.close()
returncode = self._proc.wait()
if self._proc.stdout:
self._proc.stdout.close()
return returncode
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __getattr__(self, name):
return getattr(self._stream, name)
def __iter__(self):
return iter(self._stream)
def _open(fn, mode):
"""
Wrapper to open a file with or without compression using gzip executable or
pure-python implementation.
"""
compress = fn.endswith(b'.gz')
buffered = io.BufferedReader if 'r' in mode else io.BufferedWriter
# Open file directly if compression is not required
if not compress:
return buffered(open(fn, mode))
# Open file using python gzip if zcat and gzip are not available.
if not _GZIP:
return buffered(gzip.open(fn, mode))
# When available, open file using subprocess gzip for better performance
if 'r' in mode:
proc = subprocess.Popen([b'gzip', b'-cd', fn], stdout=subprocess.PIPE)
return WrapClose(proc.stdout, proc)
else: # wb
proc = subprocess.Popen([b'gzip'], stdin=subprocess.PIPE, stdout=open(fn, mode))
return WrapClose(proc.stdin, proc)
def _walk(path):
for filename in os.listdir(path):
fullpath = os.path.join(path, filename)
if os.path.isdir(fullpath) and not os.path.islink(fullpath):
for p in _walk(fullpath):
yield p
yield fullpath
def main(repo_path, source, target, dry_run):
# Make sure the path uses bytes
if isinstance(repo_path, str):
repo_path = os.fsencode(repo_path)
# Make sure it's a rdiff-backup repository
rdiff_backup_data = os.path.join(repo_path, b'rdiff-backup-data')
if not os.path.isdir(rdiff_backup_data):
raise ValueError("%s is not a valid rdiff-backup repository" % repo_path)
# Walk on all file and folder to rename
print('renaming files')
for filename in _walk(repo_path):
dirname = os.path.dirname(filename)
basename = os.path.basename(filename)
try:
new = basename.decode(source, errors='surrogateescape').encode(target, errors='surrogateescape')
if not dry_run and new != basename:
os.rename(
os.path.join(repo_path, dirname, basename),
os.path.join(repo_path, dirname, new),
)
except Exception as e:
print('error renaming `%s`' % filename)
print(e)
# Loop on all files in rdiff-backup-data
print('processing rdiff-backup-data')
for filename in os.listdir(rdiff_backup_data):
try:
old_file = os.path.join(rdiff_backup_data, filename)
if not os.path.isfile(old_file):
continue
new_file = os.path.join(rdiff_backup_data, filename + b'.new')
with _open(old_file, 'rb') as input:
with _open(new_file, 'wb') as out:
out.write(
input.read().decode(source, errors='surrogateescape').encode(target, errors='surrogateescape')
)
# Then replace the file.
if not dry_run:
os.rename(new_file, old_file)
else:
os.unlink(new_file)
except Exception as e:
print('error converting `%s`' % filename)
print(e)
if __name__ == "__main__":
main(sys.argv[1], "CP1252", "utf8", False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment