Last active
May 7, 2020 16:11
-
-
Save cmpute/e1e70fe833ab2213d23049c0092947a4 to your computer and use it in GitHub Desktop.
Convert tarballs to zip files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env /usr/bin/python3 | |
import argparse | |
import os | |
import signal | |
import tarfile | |
import time | |
import zipfile | |
from itertools import count | |
from multiprocessing import Pool, Manager | |
from shutil import copyfileobj | |
from tqdm import tqdm | |
def _wrap_func(func, args, pool, nlock): | |
n = -1 | |
with nlock: | |
n = next(i for i,v in enumerate(pool) if v == 0) | |
pool[n] = 1 | |
ret = func(n, *args) | |
return n | |
class NumberPool: | |
def __init__(self, nworkers, *args, **kargs): | |
self._ppool = Pool(nworkers, initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),), *args, **kargs) | |
self._npool = Manager().Array('B', [0] * nworkers) | |
self._nlock = Manager().Lock() | |
def apply_async(self, func, args=(), callback=None): | |
def _wrap_cb(n): | |
with self._nlock: | |
self._npool[n] = 0 | |
return self._ppool.apply_async(_wrap_func, | |
(func, args, self._npool, self._nlock), | |
callback=_wrap_cb, | |
error_callback=lambda x: print(x)) | |
def close(self): | |
self._ppool.close() | |
def join(self): | |
self._ppool.join() | |
def convert_name(tarname): | |
tarloc = max(tarname.rfind(".tar"), tarname.rfind(".tgz")) | |
mname = tarname[:tarloc] | |
return mname + ".zip" | |
def convert(pindex, tarname, zipname, compression, skip, pattern, remove): | |
arname = os.path.split(tarname)[1] | |
tarf = tarfile.open(name=tarname, mode='r|*') | |
zipf = zipfile.ZipFile(file=zipname, mode='w', compression=compression, allowZip64=True) | |
if skip == 0: | |
check = lambda name: False | |
elif skip == 1: | |
import fnmatch | |
check = lambda name: fnmatch.fnmatch(name, pattern) | |
elif skip == 2: | |
import re | |
pattern = re.compile(pattern) | |
check = lambda name: pattern.match(name) is not None | |
# load file contents | |
tarsize = os.path.getsize(tarname) | |
pbar = tqdm(desc=arname, total=tarsize, position=pindex, unit="B", leave=False) | |
for member in tarf: | |
fname = member.name | |
if check(fname): continue | |
# write files | |
fin = tarf.extractfile(member) | |
if not fin: continue | |
with zipf.open(fname, "w") as fout: | |
copyfileobj(fin, fout) | |
finfo = zipf.getinfo(fname) | |
pbar.update(finfo.compress_size) | |
# finishing up | |
tarf.close() | |
zipf.close() | |
pbar.close() | |
if remove: os.remove(tarname) | |
def main(options): | |
# skip matching | |
if options.wildcard is not None: | |
skip = 1 | |
pattern = options.wildcard | |
elif options.regex is not None: | |
skip = 2 | |
pattern = options.regex | |
else: | |
skip = 0 | |
pattern = None | |
# parse compression option | |
if options.compression == 'none': | |
zc = zipfile.ZIP_STORED | |
elif options.compression == "zlib": | |
zc = zipfile.ZIP_DEFLATED | |
elif options.compression == "bz2": | |
zc = zipfile.ZIP_BZIP2 | |
elif options.compression == "lzma": | |
zc = zipfile.ZIP_LZMA | |
if options.directory: | |
doutput = options.output or options.input | |
# Use multiprocessing | |
pool = NumberPool(options.workers) | |
for fname in os.listdir(options.input): | |
if not (fname.endswith(".tar") or fname.endswith(".tgz") or fname.endswith(".tar.gz") or fname.endswith(".tar.bz2")): | |
continue | |
pool.apply_async(convert, (os.path.join(options.input, fname), | |
os.path.join(doutput, convert_name(fname)), | |
zc, skip, pattern, options.remove)) | |
pool.close() | |
pool.join() | |
else: | |
# parse input/output name | |
tarname = options.input | |
zname = options.output or convert_name(tarname) | |
# do convert | |
convert(0, tarname, zname, zc, skip, pattern, options.remove) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Convert tarballs to zip files.') | |
parser.add_argument('input', type=str, | |
help='Input file (in .tar format) or directory') | |
parser.add_argument('-d', '--directory', action='store_true', | |
help="Regard input and output path as directory") | |
parser.add_argument('-o', '--output', type=str, | |
help='Output file (in .zip format) or directory. If not provided, it will be the same as input') | |
parser.add_argument('-c', '--compression', choices=['none', 'zlib', 'bz2', 'lzma'], default='zlib', | |
help="Compression method for the zip file") | |
parser.add_argument('-sw', '--skip-wild-card', type=str, dest="wildcard", | |
help="Skip files with wild-card patterns") | |
parser.add_argument('-sr', '--skip-regex', type=str, dest="regex", | |
help="Skip files with regular expression") | |
parser.add_argument('-p', '--parallel-workers', type=int, dest="workers", default=1, | |
help="Number of parallet workers if converting directory") | |
parser.add_argument('-r', '--remove', action='store_true', | |
help="Remove original tar files after conversion") | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment