Skip to content

Instantly share code, notes, and snippets.

@piranna
Created September 27, 2016 19:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save piranna/4c0787a12cf5885eb36a5b561e3347fc to your computer and use it in GitHub Desktop.
Save piranna/4c0787a12cf5885eb36a5b561e3347fc to your computer and use it in GitHub Desktop.
# AppendingTiffWriter
#
# "file object wrapper class" that is able to append a TIF to an existing one
#
# Code by @vashek (https://github.com/vashek) published at
# https://github.com/python-pillow/Pillow/issues/733#issuecomment-249380397
#
# Fixed PEP8 and clean by Jesús Leganés-Combarro 'piranna' <piranna@gmail.com>
from enum import Enum
from io import open, SEEK_CUR, SEEK_END, SEEK_SET
from struct import pack, unpack
from logging import trace
class AppendingTiffWriter:
fieldSizes = [
0, # None
1, # byte
1, # ascii
2, # short
4, # long
8, # rational
1, # sbyte
1, # undefined
2, # sshort
4, # slong
8, # srational
4, # float
8, # double
]
class Tags(Enum):
StripOffsets = 273
FreeOffsets = 288
TileOffsets = 324
JPEGQTables = 519
JPEGDCTables = 520
JPEGACTables = 521
def __init__(self, fn, new=False):
self.whereToWriteNewIFDOffset = None
self.offsetOfNewPage = 0
self.name = fn
mode = "w+b" if new else "r+b"
try:
self.f = open(fn, mode)
except IOError:
self.f = open(fn, "w+b")
self.IIMM = IIMM = self.f.read(4)
if not IIMM:
# empty file - first page
self.isFirst = True
return
self.isFirst = False
if IIMM == "II\x2a\x00":
self.setEndian("<")
elif IIMM == "MM\x00\x2a":
self.setEndian(">")
else:
raise RuntimeError("Invalid TIFF file header")
self.skipIFDs()
self.goToEnd()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
return False
def tell(self):
return self.f.tell() - self.offsetOfNewPage
def seek(self, offset, whence):
if whence == SEEK_SET:
offset += self.offsetOfNewPage
self.f.seek(offset, whence)
return self.tell()
def goToEnd(self):
self.f.seek(0, SEEK_END)
pos = self.f.tell()
# pad to 16 byte boundary
padBytes = 16 - pos % 16
if 0 < padBytes < 16:
self.f.write(bytes(bytearray(padBytes)))
self.offsetOfNewPage = self.f.tell()
def setEndian(self, endian):
self.endian = endian
self.longFmt = self.endian + "L"
self.shortFmt = self.endian + "H"
self.tagFormat = self.endian + "HHL"
def skipIFDs(self):
while True:
IFDoffset = self.readLong()
if IFDoffset == 0:
self.whereToWriteNewIFDOffset = self.f.tell() - 4
break
self.f.seek(IFDoffset)
numTags = self.readShort()
self.f.seek(numTags * 12, SEEK_CUR)
def write(self, data):
return self.f.write(data)
def readShort(self):
value, = unpack(self.shortFmt, self.f.read(2))
return value
def readLong(self):
value, = unpack(self.longFmt, self.f.read(4))
return value
def rewriteLastShortToLong(self, value):
self.f.seek(-2, SEEK_CUR)
bytesWritten = self.f.write(pack(self.longFmt, value))
if bytesWritten != 4:
raise RuntimeError("wrote only %u bytes but wanted 4" %
bytesWritten)
def rewriteLastShort(self, value):
self.f.seek(-2, SEEK_CUR)
bytesWritten = self.f.write(pack(self.shortFmt, value))
if bytesWritten != 2:
raise RuntimeError("wrote only %u bytes but wanted 2" %
bytesWritten)
def rewriteLastLong(self, value):
self.f.seek(-4, SEEK_CUR)
bytesWritten = self.f.write(pack(self.longFmt, value))
if bytesWritten != 4:
raise RuntimeError("wrote only %u bytes but wanted 4" %
bytesWritten)
def writeShort(self, value):
bytesWritten = self.f.write(pack(self.shortFmt, value))
if bytesWritten != 2:
raise RuntimeError("wrote only %u bytes but wanted 2" %
bytesWritten)
def writeLong(self, value):
bytesWritten = self.f.write(pack(self.longFmt, value))
if bytesWritten != 4:
raise RuntimeError("wrote only %u bytes but wanted 4" %
bytesWritten)
def close(self):
if not self.isFirst:
# fix offsets
self.f.seek(self.offsetOfNewPage)
IIMM = self.f.read(4)
if not IIMM:
raise RuntimeError("nothing written into new page")
if IIMM != self.IIMM:
raise RuntimeError("IIMM of new page doesn't match IIMM of "
"first page")
IFDoffset = self.readLong()
IFDoffset += self.offsetOfNewPage
self.f.seek(self.whereToWriteNewIFDOffset)
self.writeLong(IFDoffset)
self.f.seek(IFDoffset)
self.fixIFD()
self.f.close()
def fixIFD(self):
numTags = self.readShort()
trace("fixing IFD at %X; number of tags: %u (0x%X)", self.f.tell()-2,
numTags, numTags)
for i in range(numTags):
tag, fieldType, count = unpack(self.tagFormat, self.f.read(8))
trace(" at %X: tag %u (0x%X), type %u, count %u", self.f.tell()-8,
tag, tag, fieldType, count)
fieldSize = self.fieldSizes[fieldType]
totalSize = fieldSize * count
isLocal = (totalSize <= 4)
if not isLocal:
offset = self.readLong()
offset += self.offsetOfNewPage
self.rewriteLastLong(offset)
if tag in self.Tags:
curPos = self.f.tell()
if isLocal:
self.fixOffsets(count, isShort=(fieldSize == 2),
isLong=(fieldSize == 4))
self.f.seek(curPos + 4)
else:
self.f.seek(offset)
self.fixOffsets(count, isShort=(fieldSize == 2),
isLong=(fieldSize == 4))
self.f.seek(curPos)
offset = curPos = None
elif isLocal:
# skip the locally stored value that is not an offset
self.f.seek(4, SEEK_CUR)
def fixOffsets(self, count, isShort=False, isLong=False):
if not isShort and not isLong:
raise RuntimeError("offset is neither short nor long")
for i in range(count):
offset = self.readShort() if isShort else self.readLong()
offset += self.offsetOfNewPage
if isShort and offset >= 65536:
# offset is now too large - we must convert shorts to longs
if count != 1:
raise RuntimeError("not implemented") # XXX TODO
# simple case - the offset is just one and therefore it is
# local (not referenced with another offset)
self.rewriteLastShortToLong(offset)
self.f.seek(-10, SEEK_CUR)
self.writeShort(4) # rewrite the type to LONG
self.f.seek(8, SEEK_CUR)
elif isShort:
self.rewriteLastShort(offset)
else:
self.rewriteLastLong(offset)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment