Skip to content

Instantly share code, notes, and snippets.

@cmpute
Last active May 7, 2020 16:11
Show Gist options
  • Save cmpute/e1e70fe833ab2213d23049c0092947a4 to your computer and use it in GitHub Desktop.
Save cmpute/e1e70fe833ab2213d23049c0092947a4 to your computer and use it in GitHub Desktop.
Convert tarballs to zip files
#!/usr/bin/env /usr/bin/python3
import argparse
import os
import signal
import tarfile
import time
import zipfile
from itertools import count
from multiprocessing import Pool, Manager
from shutil import copyfileobj
from tqdm import tqdm
def _wrap_func(func, args, pool, nlock):
n = -1
with nlock:
n = next(i for i,v in enumerate(pool) if v == 0)
pool[n] = 1
ret = func(n, *args)
return n
class NumberPool:
def __init__(self, nworkers, *args, **kargs):
self._ppool = Pool(nworkers, initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),), *args, **kargs)
self._npool = Manager().Array('B', [0] * nworkers)
self._nlock = Manager().Lock()
def apply_async(self, func, args=(), callback=None):
def _wrap_cb(n):
with self._nlock:
self._npool[n] = 0
return self._ppool.apply_async(_wrap_func,
(func, args, self._npool, self._nlock),
callback=_wrap_cb,
error_callback=lambda x: print(x))
def close(self):
self._ppool.close()
def join(self):
self._ppool.join()
def convert_name(tarname):
tarloc = max(tarname.rfind(".tar"), tarname.rfind(".tgz"))
mname = tarname[:tarloc]
return mname + ".zip"
def convert(pindex, tarname, zipname, compression, skip, pattern, remove):
arname = os.path.split(tarname)[1]
tarf = tarfile.open(name=tarname, mode='r|*')
zipf = zipfile.ZipFile(file=zipname, mode='w', compression=compression, allowZip64=True)
if skip == 0:
check = lambda name: False
elif skip == 1:
import fnmatch
check = lambda name: fnmatch.fnmatch(name, pattern)
elif skip == 2:
import re
pattern = re.compile(pattern)
check = lambda name: pattern.match(name) is not None
# load file contents
tarsize = os.path.getsize(tarname)
pbar = tqdm(desc=arname, total=tarsize, position=pindex, unit="B", leave=False)
for member in tarf:
fname = member.name
if check(fname): continue
# write files
fin = tarf.extractfile(member)
if not fin: continue
with zipf.open(fname, "w") as fout:
copyfileobj(fin, fout)
finfo = zipf.getinfo(fname)
pbar.update(finfo.compress_size)
# finishing up
tarf.close()
zipf.close()
pbar.close()
if remove: os.remove(tarname)
def main(options):
# skip matching
if options.wildcard is not None:
skip = 1
pattern = options.wildcard
elif options.regex is not None:
skip = 2
pattern = options.regex
else:
skip = 0
pattern = None
# parse compression option
if options.compression == 'none':
zc = zipfile.ZIP_STORED
elif options.compression == "zlib":
zc = zipfile.ZIP_DEFLATED
elif options.compression == "bz2":
zc = zipfile.ZIP_BZIP2
elif options.compression == "lzma":
zc = zipfile.ZIP_LZMA
if options.directory:
doutput = options.output or options.input
# Use multiprocessing
pool = NumberPool(options.workers)
for fname in os.listdir(options.input):
if not (fname.endswith(".tar") or fname.endswith(".tgz") or fname.endswith(".tar.gz") or fname.endswith(".tar.bz2")):
continue
pool.apply_async(convert, (os.path.join(options.input, fname),
os.path.join(doutput, convert_name(fname)),
zc, skip, pattern, options.remove))
pool.close()
pool.join()
else:
# parse input/output name
tarname = options.input
zname = options.output or convert_name(tarname)
# do convert
convert(0, tarname, zname, zc, skip, pattern, options.remove)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert tarballs to zip files.')
parser.add_argument('input', type=str,
help='Input file (in .tar format) or directory')
parser.add_argument('-d', '--directory', action='store_true',
help="Regard input and output path as directory")
parser.add_argument('-o', '--output', type=str,
help='Output file (in .zip format) or directory. If not provided, it will be the same as input')
parser.add_argument('-c', '--compression', choices=['none', 'zlib', 'bz2', 'lzma'], default='zlib',
help="Compression method for the zip file")
parser.add_argument('-sw', '--skip-wild-card', type=str, dest="wildcard",
help="Skip files with wild-card patterns")
parser.add_argument('-sr', '--skip-regex', type=str, dest="regex",
help="Skip files with regular expression")
parser.add_argument('-p', '--parallel-workers', type=int, dest="workers", default=1,
help="Number of parallet workers if converting directory")
parser.add_argument('-r', '--remove', action='store_true',
help="Remove original tar files after conversion")
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment