Skip to content

Instantly share code, notes, and snippets.

@junorouse
Forked from iamahuman/kalzip
Created October 15, 2018 12:11
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save junorouse/30c8eb5f362e4191e3abccdf84339330 to your computer and use it in GitHub Desktop.
Create and extract Zip archives with KS X 1001(euc-kr / MS949) encoded filenames
#!/usr/bin/env python
import zipfile
import os
import stat
import sys
import codecs
import time
import datetime
import unicodedata
import sys
import pytz
import shutil
# Python 2 / Python 3 compatiblity
if sys.version_info < (3,):
text_type = unicode
binary_type = str
sep_u = os.sep.decode()
slash_u = '/'.decode()
slash_b, dot_b = '/', '.'
to_str = lambda x: x
else:
text_type = str
binary_type = bytes
sep_u = os.sep
slash_u = '/'
slash_b, dot_b = bytes((0x2f,)), bytes((0x2e,))
to_str = lambda x: x.decode() if isinstance(x, bytes) else x
# ZIP timestamp utility functions
def timestamp_to_zip(tsval, tz=pytz.UTC):
dt = datetime.datetime.fromtimestamp(tsval, tz=pytz.UTC).astimezone(tz)
return (dt.year, dt.month, dt.day + 1, dt.hour, dt.minute, dt.second)
def zip_to_timestamp(tup, tz=pytz.UTC):
year, month, dayp1, hour, minute, second = tup
dt = datetime.datetime(year, month, dayp1 - 1, hour, minute, second, tzinfo=tz)
return (dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.UTC)).total_seconds()
assert zip_to_timestamp(timestamp_to_zip(112462320, pytz.UTC), pytz.UTC) == 112462320
class Kalzip(object):
TEXT_EXTS = frozenset(('.txt', '.log', '.ini', '.c', '.h',
'.cpp', '.hpp', '.cc', '.hh', '.py', '.java', '.htm', '.html',
'.css', '.js', '.vbs', '.vb'))
def __init__(self):
object.__init__(self)
self.encnam_native = sys.getfilesystemencoding()
self.encnam_zip = 'cp949'
self.name_use_nfd = sys.platform == 'darwin'
self.text_exts = self.TEXT_EXTS
self.text_max_filesize = 131072
self.enctxt_native = 'utf-8'
self.enctxt_zip = 'cp949'
self.timezone = pytz.timezone('Asia/Seoul')
def timestamp_to_zip(self, tsval):
return timestamp_to_zip(tsval, self.timezone)
def zip_to_timestamp(self, tup):
return zip_to_timestamp(tup, self.timezone)
def native_path_to_zip(self, path, append_slash=False):
"""
Converts native filesystem path (text or bytes) to ZIP filename.
"""
if isinstance(path, text_type):
path_u = path
elif isinstance(path, binary_type):
path_u = codecs.decode(path, self.encnam_native, 'replace')
else:
raise TypeError("path must be %s or %s" % (text_type.__name__, binary_type.__name__))
path_n = os.path.normpath(path_u).replace(sep_u, slash_u)
path_n = unicodedata.normalize('NFC', path_n)
if append_slash:
path_n += slash_u
return codecs.encode(path_n, self.encnam_zip, 'replace')
def zip_path_to_native(self, path, do_encode=False):
"""
Converts ZIP filename (bytes) to native filesystem path.
"""
if not isinstance(path, binary_type):
raise TypeError("path must be %s" % (binary_type.__name__,))
path_n = codecs.decode(path, self.encnam_zip, 'replace')
if self.name_use_nfd:
path_n = unicodedata.normalize('NFD', path_n)
path_n = path_n.replace(slash_u, sep_u)
return (codecs.encode(path_n, self.encnam_native)
if do_encode else path_n)
def add_entry(self, zf, indir, fname):
src_name = os.path.join(indir, fname) if indir is not None else fname
stat_obj = os.stat(src_name)
_, ext = os.path.splitext(fname)
is_text = (to_str(ext) in self.text_exts and
stat.S_ISREG(stat_obj.st_mode) and
stat_obj.st_size <= self.text_max_filesize)
is_dir = stat.S_ISDIR(stat_obj.st_mode)
zpath = self.native_path_to_zip(fname, append_slash=is_dir)
if is_text:
try:
encbuf = bytearray()
encoder = codecs.getincrementalencoder(self.enctxt_zip)('strict')
with codecs.open(src_name, "r", encoding=self.enctxt_native) as cf:
while True:
buf = cf.read(1024)
is_eof = len(buf) == 0
encbuf.extend(encoder.encode(buf, is_eof))
if len(encbuf) > self.text_max_filesize:
raise RuntimeError("text file too large")
if is_eof:
break
except Exception:
# TODO report errors
is_text = False
else:
zi = zipfile.ZipInfo(zpath, date_time=self.timestamp_to_zip(stat_obj.st_mtime))
zi.external_attr = (6 << 22) | (4 << 19) | (4 << 16) # -rwxr-xr-x(Unix)
zi.compress_type = zipfile.ZIP_DEFLATED
zf.writestr(zi, bytes(encbuf))
if not is_text:
if is_dir:
zi = zipfile.ZipInfo(zpath, date_time=self.timestamp_to_zip(stat_obj.st_mtime))
zi.external_attr = (4 << 28) | (7 << 22) | (5 << 19) | (5 << 16) | 0x10 # drwxr-xr-x(Unix) DIR(MSDOS)
zi.compress_type = zipfile.ZIP_STORED
zf.writestr(zi, b'')
else:
zf.write(src_name, arcname=zpath, compress_type=zipfile.ZIP_DEFLATED)
def extract_entry(self, zf, outdir, zinfo):
comp = zinfo.filename.rsplit(dot_b, 1)
is_dir = zinfo.filename.endswith(slash_b)
is_text = (not is_dir and len(comp) > 1 and ('.' + to_str(comp[-1])) in self.text_exts)
dst_name = os.path.join(outdir, self.zip_path_to_native(zinfo.filename))
try:
os.makedirs(os.path.dirname(dst_name))
except OSError:
pass
#if os.path.exists(dst_name) and not is_dir:
# raise IOError("file exists: " + repr(dst_name))
if is_text:
try:
txtdata = codecs.encode(
codecs.decode(zf.read(zinfo),
self.enctxt_zip, 'strict'),
self.enctxt_native, 'strict')
with open(dst_name, "wb") as bf:
bf.write(txtdata)
mtime = self.zip_to_timestamp(zinfo.date_time)
os.utime(dst_name, (time.time(), mtime))
except Exception:
is_text = False
if not is_text:
if is_dir:
try:
os.makedirs(dst_name)
except OSError:
pass
else:
mtime = self.zip_to_timestamp(zinfo.date_time)
os.utime(dst_name, (time.time(), mtime))
else:
with zf.open(zinfo, 'r') as af, open(dst_name, "wb") as bf:
shutil.copyfileobj(af, bf)
mtime = self.zip_to_timestamp(zinfo.date_time)
os.utime(dst_name, (time.time(), mtime))
def add_entry_rec(self, zf, indir, fname):
if os.path.islink(fname):
return
self.add_entry(zf, indir, fname)
if os.path.isdir(fname):
for child in os.listdir(fname):
self.add_entry_rec(zf, indir, fname + '/' + child)
def zip(self, outname, indir, files):
with zipfile.ZipFile(outname, 'w') as zf:
for request in files:
self.add_entry_rec(zf, indir, request)
def unzip(self, zipname, outdir):
with zipfile.ZipFile(zipname, 'r') as zf:
for zinfo in zf.infolist():
self.extract_entry(zf, outdir, zinfo)
if __name__ == '__main__':
k = Kalzip()
if sys.argv[1] == 'a':
k.zip(sys.argv[2], '.', sys.argv[3:])
elif sys.argv[1] == 'x':
k.unzip(sys.argv[2], '.')
else:
raise ValueError('cmdline')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment