Created
February 11, 2014 00:40
-
-
Save iamgreaser/8927214 to your computer and use it in GitHub Desktop.
ztrip.py: tool which currently strips + merges zip files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
# zip file stripper | |
# by Ben "GreaseMonkey" Russell, 2014 | |
# for ALSO | |
# licensing stuff will need to be sorted out so don't spread this too much | |
SEEK_END = 2 | |
import struct | |
import sys | |
import zlib | |
class FileEnt: | |
def __init__(self, data, lastmodtime, lastmoddate): | |
self.data = data | |
self.lastmodtime = lastmodtime | |
self.lastmoddate = lastmoddate | |
class ZipFile: | |
"""A zip file object which can be manipulated.""" | |
def __init__(self, fname=None): | |
"""Opens a zip file from a file name or file object.""" | |
# Determine argument type | |
fp = None | |
if type(fname) == type(""): | |
fp = open(fname, "rb") | |
elif fname != None: | |
fp = fname | |
# If None, just fill this with defaults. | |
if fp == None: | |
self.files = {} | |
return | |
# These stages should document themselves pretty easily. | |
# Need more info? Check the functions themselves. | |
entcount, ziplen = self._find_central_dict(fp) | |
self._load_central_dict(fp, entcount, ziplen) | |
def _find_central_dict(self, fp): | |
"""Finds the central dictionary from the end of file.""" | |
# TODO: check if this works with zip files which have comments | |
# Keep going until we match something that looks like the comment length. | |
for comoffs in xrange(0xFFFF+1): | |
fp.seek(-2-comoffs, SEEK_END) | |
comlen, = struct.unpack("<H", fp.read(2)) | |
if comlen == comoffs: | |
ret = self._try_this_central_dict(fp, comoffs) | |
if ret != False: | |
return ret | |
raise Exception("EOF signature not found") | |
def _try_this_central_dict(self, fp, comoffs): | |
""" | |
Attempt to read the EOF signature from a given position. | |
Said position points at the beginning of the zip file comment. | |
""" | |
# Seek and check signature | |
fp.seek(-22-comoffs, SEEK_END) | |
if fp.read(4) != "PK\x05\x06": | |
return False | |
# Read some crap pertaining to disks | |
diskidx, diskcdidx, cddisklen, cdtotlen, = struct.unpack("<HHHH", fp.read(8)) | |
cdsize, cdoffs, = struct.unpack("<II", fp.read(8)) | |
#print diskidx, diskcdidx, cddisklen, cdtotlen, cdsize, cdoffs | |
# We're just going to assume that they only ever use up one disk. | |
# If they use more than one, the mod authors are terrible. | |
if diskidx != 0: return False | |
if diskcdidx != 0: return False | |
if cddisklen != cdtotlen: return False | |
# Move to the start of the central directory. We're sorted. | |
fp.seek(-22-comoffs-cdsize, SEEK_END) | |
return cdtotlen, (cdoffs + cdsize + comoffs + 22) | |
def _load_central_dict(self, fp, entcount, ziplen): | |
"""Loads the central directory from a given offset.""" | |
self.files = {} | |
for i in xrange(entcount): | |
# Read the Central Directory magic. | |
cdheadmagic = fp.read(4) | |
if cdheadmagic != "PK\x01\x02": | |
raise Exception("Central Directory header incorrect for file %i/%i" | |
% (i+1, entcount)) | |
# Read the version. | |
vermade, verneeded = struct.unpack("<HH", fp.read(4)) | |
# Read more things. | |
flags, cm, lastmodtime, lastmoddate = struct.unpack("<HHHH", fp.read(8)) | |
crc32, cmpsize, uncmpsize, = struct.unpack("<iII", fp.read(12)) | |
fnlen, eflen, fclen, diskstart = struct.unpack("<HHHH", fp.read(8)) | |
intfa, extfa, lhoffs = struct.unpack("<HII", fp.read(10)) | |
fname = fp.read(fnlen) | |
efield = fp.read(eflen) | |
fcom = fp.read(fclen) | |
if fname.find(".DS_Store") >= 0 or fname.find("__MACOSX/") >= 0: | |
print "Skipping %s" % (repr(fname),) | |
continue | |
# Do some checks. | |
#print "Zip version %i.%i" % (verneeded//10, verneeded%10) | |
if not (verneeded == 20 or (verneeded < 20 and cm == 0) | |
or (verneeded == 10 and cm == 8)): | |
# Apple's zip utility always sets the version needed to 1.0, | |
# even when deflate / directories are used. | |
# THIS IS COMPLETELY INCORRECT. | |
print "File %i %s" % (cm, repr(fname)) | |
print "Zip version %i.%i" % (verneeded//10, verneeded%10, ) | |
print "WARNING: Zip version != 2.0 found. This might not unpack correctly!" | |
# We'll be right back after this jump. | |
tempfpos = fp.tell() | |
fp.seek((-ziplen) + lhoffs, SEEK_END) | |
# Check if we have a local header. | |
if fp.read(4) != "PK\x03\x04": | |
raise Exception("Local header not found for file %i/%i" | |
% (i+1, entcount)) | |
# Read the local header. | |
lhverneeded, lhflags, lhcm, = struct.unpack("<HHH", fp.read(6)) | |
lhlastmodtime, lhlastmoddate, = struct.unpack("<HH", fp.read(4)) | |
lhcrc, lhcmpsize, lhuncmpsize, = struct.unpack("<iII", fp.read(12)) | |
lhfnlen, lheflen, = struct.unpack("<HH", fp.read(4)) | |
lhfname = fp.read(lhfnlen) | |
lheflags = fp.read(lheflen) | |
# TODO: check the integrity of these against the Central Directory | |
# - we're just using what the CD tells us for now | |
# Unpack. | |
cdata = fp.read(cmpsize) | |
udata = cdata | |
if cm == 0: | |
# Method 0: Store | |
pass | |
elif cm == 8: | |
# Method 8: Deflate | |
udata = zlib.decompress(udata, -15) | |
else: | |
raise Exception("Compression method %i not supported" % cm) | |
# Check the CRC32 for corruption. | |
calccrc = zlib.crc32(udata) | |
if calccrc != crc32: | |
print zlib.crc32(udata), lhcrc, crc32 | |
raise Exception("CRC32 mismatch") | |
# Drop it in. | |
self.files[fname] = FileEnt(udata, lastmodtime, lastmoddate) | |
# Aaaaand we're back! | |
fp.seek(tempfpos) | |
# Some printaroo if you really want it. | |
""" | |
print flags, cm, lastmodtime, lastmoddate, crc32, cmpsize, uncmpsize | |
print fnlen, eflen, fclen, diskstart, intfa, extfa, lhoffs | |
print repr(fname), repr(efield), repr(fcom) | |
""" | |
def write(self, fname): | |
"""Writes a zip file to a file name or file object.""" | |
# Determine argument type | |
fp = None | |
closefp = False | |
if type(fname) == type(""): | |
fp = open(fname, "wb") | |
closefp = True | |
elif fname != None: | |
fp = fname | |
else: | |
raise Exception("Cannot write to nowhere") | |
# Get the filenames into a sane order | |
fnlist = list(self.files.iterkeys()) | |
fnlist.sort() | |
# Now write away. Start with the local headers... | |
cdir = "" | |
for fname in fnlist: | |
# TODO: Properly determine if this is a directory earlier | |
isdir = fname.endswith("/") | |
verneeded = 10 | |
cm = 0 | |
fent = self.files[fname] | |
udata = fent.data | |
cdata = udata | |
if not isdir: | |
# Compress, skipping 2-byte header + 4-byte CRC | |
cdata = zlib.compress(udata, 9)[2:-4] | |
if len(cdata) < len(udata): | |
cm = 8 | |
verneeded = 20 | |
else: | |
cdata = udata | |
else: | |
# TODO: confirm if this is a requirement | |
pass | |
#verneeded = 20 | |
# Start writing local header / CD header | |
lhoffs = fp.tell() | |
fp.write("PK\x03\x04") | |
cdir += ("PK\x01\x02") | |
fp.write(struct.pack("<H", verneeded)) | |
cdir += (struct.pack("<HH", 0x0300 + 20, verneeded)) | |
fp.write(struct.pack("<HHHH", 0, cm, fent.lastmodtime, fent.lastmoddate)) | |
cdir += (struct.pack("<HHHH", 0, cm, fent.lastmodtime, fent.lastmoddate)) | |
s = struct.pack("<iII", zlib.crc32(udata), len(cdata), len(udata)) | |
fp.write(s) | |
cdir += (s) | |
fp.write(struct.pack("<HH", len(fname), 0)) | |
cdir += (struct.pack("<HHH", len(fname), 0, 0)) | |
# TODO: file attributes | |
cdir += (struct.pack("<HHII", 0, 0, 0, lhoffs)) | |
# Write file name | |
fp.write(fname) | |
cdir += (fname) | |
# Write data | |
fp.write(cdata) | |
# Write the Central Directory to the file | |
cdiroffs = fp.tell() | |
fp.write(cdir) | |
# Now write the EOF signature | |
fp.write("PK\x05\x06") | |
fp.write(struct.pack("<HHHH", 0, 0, len(fnlist), len(fnlist))) | |
fp.write(struct.pack("<II", len(cdir), cdiroffs)) | |
fp.write(struct.pack("<H", 0)) | |
# Done! | |
if closefp: | |
fp.close() | |
zb = ZipFile() | |
for fname in sys.argv[1:]: | |
print fname | |
zf = ZipFile(fname) | |
for (name, item) in zf.files.iteritems(): | |
if name not in zb.files: | |
zb.files[name] = item | |
else: | |
print "Overwriting %s" % (repr(name),) | |
zb.files[name] = item | |
zb.write("merged.zip") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment