Skip to content

Instantly share code, notes, and snippets.

@iamgreaser
Created February 11, 2014 00:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iamgreaser/8927214 to your computer and use it in GitHub Desktop.
Save iamgreaser/8927214 to your computer and use it in GitHub Desktop.
ztrip.py: tool which currently strips + merges zip files
#!/usr/bin/env python2
# zip file stripper
# by Ben "GreaseMonkey" Russell, 2014
# for ALSO
# licensing stuff will need to be sorted out so don't spread this too much
SEEK_END = 2
import struct
import sys
import zlib
class FileEnt:
def __init__(self, data, lastmodtime, lastmoddate):
self.data = data
self.lastmodtime = lastmodtime
self.lastmoddate = lastmoddate
class ZipFile:
"""A zip file object which can be manipulated."""
def __init__(self, fname=None):
"""Opens a zip file from a file name or file object."""
# Determine argument type
fp = None
if type(fname) == type(""):
fp = open(fname, "rb")
elif fname != None:
fp = fname
# If None, just fill this with defaults.
if fp == None:
self.files = {}
return
# These stages should document themselves pretty easily.
# Need more info? Check the functions themselves.
entcount, ziplen = self._find_central_dict(fp)
self._load_central_dict(fp, entcount, ziplen)
def _find_central_dict(self, fp):
"""Finds the central dictionary from the end of file."""
# TODO: check if this works with zip files which have comments
# Keep going until we match something that looks like the comment length.
for comoffs in xrange(0xFFFF+1):
fp.seek(-2-comoffs, SEEK_END)
comlen, = struct.unpack("<H", fp.read(2))
if comlen == comoffs:
ret = self._try_this_central_dict(fp, comoffs)
if ret != False:
return ret
raise Exception("EOF signature not found")
def _try_this_central_dict(self, fp, comoffs):
"""
Attempt to read the EOF signature from a given position.
Said position points at the beginning of the zip file comment.
"""
# Seek and check signature
fp.seek(-22-comoffs, SEEK_END)
if fp.read(4) != "PK\x05\x06":
return False
# Read some crap pertaining to disks
diskidx, diskcdidx, cddisklen, cdtotlen, = struct.unpack("<HHHH", fp.read(8))
cdsize, cdoffs, = struct.unpack("<II", fp.read(8))
#print diskidx, diskcdidx, cddisklen, cdtotlen, cdsize, cdoffs
# We're just going to assume that they only ever use up one disk.
# If they use more than one, the mod authors are terrible.
if diskidx != 0: return False
if diskcdidx != 0: return False
if cddisklen != cdtotlen: return False
# Move to the start of the central directory. We're sorted.
fp.seek(-22-comoffs-cdsize, SEEK_END)
return cdtotlen, (cdoffs + cdsize + comoffs + 22)
def _load_central_dict(self, fp, entcount, ziplen):
"""Loads the central directory from a given offset."""
self.files = {}
for i in xrange(entcount):
# Read the Central Directory magic.
cdheadmagic = fp.read(4)
if cdheadmagic != "PK\x01\x02":
raise Exception("Central Directory header incorrect for file %i/%i"
% (i+1, entcount))
# Read the version.
vermade, verneeded = struct.unpack("<HH", fp.read(4))
# Read more things.
flags, cm, lastmodtime, lastmoddate = struct.unpack("<HHHH", fp.read(8))
crc32, cmpsize, uncmpsize, = struct.unpack("<iII", fp.read(12))
fnlen, eflen, fclen, diskstart = struct.unpack("<HHHH", fp.read(8))
intfa, extfa, lhoffs = struct.unpack("<HII", fp.read(10))
fname = fp.read(fnlen)
efield = fp.read(eflen)
fcom = fp.read(fclen)
if fname.find(".DS_Store") >= 0 or fname.find("__MACOSX/") >= 0:
print "Skipping %s" % (repr(fname),)
continue
# Do some checks.
#print "Zip version %i.%i" % (verneeded//10, verneeded%10)
if not (verneeded == 20 or (verneeded < 20 and cm == 0)
or (verneeded == 10 and cm == 8)):
# Apple's zip utility always sets the version needed to 1.0,
# even when deflate / directories are used.
# THIS IS COMPLETELY INCORRECT.
print "File %i %s" % (cm, repr(fname))
print "Zip version %i.%i" % (verneeded//10, verneeded%10, )
print "WARNING: Zip version != 2.0 found. This might not unpack correctly!"
# We'll be right back after this jump.
tempfpos = fp.tell()
fp.seek((-ziplen) + lhoffs, SEEK_END)
# Check if we have a local header.
if fp.read(4) != "PK\x03\x04":
raise Exception("Local header not found for file %i/%i"
% (i+1, entcount))
# Read the local header.
lhverneeded, lhflags, lhcm, = struct.unpack("<HHH", fp.read(6))
lhlastmodtime, lhlastmoddate, = struct.unpack("<HH", fp.read(4))
lhcrc, lhcmpsize, lhuncmpsize, = struct.unpack("<iII", fp.read(12))
lhfnlen, lheflen, = struct.unpack("<HH", fp.read(4))
lhfname = fp.read(lhfnlen)
lheflags = fp.read(lheflen)
# TODO: check the integrity of these against the Central Directory
# - we're just using what the CD tells us for now
# Unpack.
cdata = fp.read(cmpsize)
udata = cdata
if cm == 0:
# Method 0: Store
pass
elif cm == 8:
# Method 8: Deflate
udata = zlib.decompress(udata, -15)
else:
raise Exception("Compression method %i not supported" % cm)
# Check the CRC32 for corruption.
calccrc = zlib.crc32(udata)
if calccrc != crc32:
print zlib.crc32(udata), lhcrc, crc32
raise Exception("CRC32 mismatch")
# Drop it in.
self.files[fname] = FileEnt(udata, lastmodtime, lastmoddate)
# Aaaaand we're back!
fp.seek(tempfpos)
# Some printaroo if you really want it.
"""
print flags, cm, lastmodtime, lastmoddate, crc32, cmpsize, uncmpsize
print fnlen, eflen, fclen, diskstart, intfa, extfa, lhoffs
print repr(fname), repr(efield), repr(fcom)
"""
def write(self, fname):
"""Writes a zip file to a file name or file object."""
# Determine argument type
fp = None
closefp = False
if type(fname) == type(""):
fp = open(fname, "wb")
closefp = True
elif fname != None:
fp = fname
else:
raise Exception("Cannot write to nowhere")
# Get the filenames into a sane order
fnlist = list(self.files.iterkeys())
fnlist.sort()
# Now write away. Start with the local headers...
cdir = ""
for fname in fnlist:
# TODO: Properly determine if this is a directory earlier
isdir = fname.endswith("/")
verneeded = 10
cm = 0
fent = self.files[fname]
udata = fent.data
cdata = udata
if not isdir:
# Compress, skipping 2-byte header + 4-byte CRC
cdata = zlib.compress(udata, 9)[2:-4]
if len(cdata) < len(udata):
cm = 8
verneeded = 20
else:
cdata = udata
else:
# TODO: confirm if this is a requirement
pass
#verneeded = 20
# Start writing local header / CD header
lhoffs = fp.tell()
fp.write("PK\x03\x04")
cdir += ("PK\x01\x02")
fp.write(struct.pack("<H", verneeded))
cdir += (struct.pack("<HH", 0x0300 + 20, verneeded))
fp.write(struct.pack("<HHHH", 0, cm, fent.lastmodtime, fent.lastmoddate))
cdir += (struct.pack("<HHHH", 0, cm, fent.lastmodtime, fent.lastmoddate))
s = struct.pack("<iII", zlib.crc32(udata), len(cdata), len(udata))
fp.write(s)
cdir += (s)
fp.write(struct.pack("<HH", len(fname), 0))
cdir += (struct.pack("<HHH", len(fname), 0, 0))
# TODO: file attributes
cdir += (struct.pack("<HHII", 0, 0, 0, lhoffs))
# Write file name
fp.write(fname)
cdir += (fname)
# Write data
fp.write(cdata)
# Write the Central Directory to the file
cdiroffs = fp.tell()
fp.write(cdir)
# Now write the EOF signature
fp.write("PK\x05\x06")
fp.write(struct.pack("<HHHH", 0, 0, len(fnlist), len(fnlist)))
fp.write(struct.pack("<II", len(cdir), cdiroffs))
fp.write(struct.pack("<H", 0))
# Done!
if closefp:
fp.close()
zb = ZipFile()
for fname in sys.argv[1:]:
print fname
zf = ZipFile(fname)
for (name, item) in zf.files.iteritems():
if name not in zb.files:
zb.files[name] = item
else:
print "Overwriting %s" % (repr(name),)
zb.files[name] = item
zb.write("merged.zip")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment