Skip to content

Instantly share code, notes, and snippets.

@lambdafu
Forked from piranna/multiPageTiff.py
Last active September 28, 2016 15:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save lambdafu/fc3ef0863d6b986bc74030f92bb6154f to your computer and use it in GitHub Desktop.
Save lambdafu/fc3ef0863d6b986bc74030f92bb6154f to your computer and use it in GitHub Desktop.
# AppendingTiffWriter
# -*- coding: utf-8 -*-
#
# "file object wrapper class" that is able to append a TIF to an existing one
#
# Code by @vashek (https://github.com/vashek) published at
# https://github.com/python-pillow/Pillow/issues/733#issuecomment-249380397
#
# Fixed PEP8 and clean by Jesús Leganés-Combarro 'piranna' <piranna@gmail.com>
from enum import IntEnum
from io import open, SEEK_CUR, SEEK_END, SEEK_SET
from struct import pack, unpack
try:
from logging import trace
except ImportError as e:
def trace(*args, **kwargs):
pass
class AppendingTiffWriter:
fieldSizes = [
0, # None
1, # byte
1, # ascii
2, # short
4, # long
8, # rational
1, # sbyte
1, # undefined
2, # sshort
4, # slong
8, # srational
4, # float
8, # double
]
class Tags(IntEnum):
StripOffsets = 273
FreeOffsets = 288
TileOffsets = 324
JPEGQTables = 519
JPEGDCTables = 520
JPEGACTables = 521
def __init__(self, fn, new=False):
self.whereToWriteNewIFDOffset = None
self.offsetOfNewPage = 0
self.name = fn
mode = "w+b" if new else "r+b"
if hasattr(fn, 'read'):
self.f = fn
else:
try:
self.f = open(fn, mode)
except IOError:
self.f = open(fn, "w+b")
self.IIMM = IIMM = self.f.read(4)
if not IIMM:
# empty file - first page
self.isFirst = True
return
self.isFirst = False
if IIMM == "II\x2a\x00":
self.setEndian("<")
elif IIMM == "MM\x00\x2a":
self.setEndian(">")
else:
raise RuntimeError("Invalid TIFF file header")
self.skipIFDs()
self.goToEnd()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
return False
def tell(self):
return self.f.tell() - self.offsetOfNewPage
def seek(self, offset, whence):
if whence == SEEK_SET:
offset += self.offsetOfNewPage
self.f.seek(offset, whence)
return self.tell()
def goToEnd(self):
self.f.seek(0, SEEK_END)
pos = self.f.tell()
# pad to 16 byte boundary
padBytes = 16 - pos % 16
if 0 < padBytes < 16:
self.f.write(bytes(bytearray(padBytes)))
self.offsetOfNewPage = self.f.tell()
def setEndian(self, endian):
self.endian = endian
self.longFmt = self.endian + "L"
self.shortFmt = self.endian + "H"
self.tagFormat = self.endian + "HHL"
def skipIFDs(self):
while True:
IFDoffset = self.readLong()
if IFDoffset == 0:
self.whereToWriteNewIFDOffset = self.f.tell() - 4
break
self.f.seek(IFDoffset)
numTags = self.readShort()
self.f.seek(numTags * 12, SEEK_CUR)
def write(self, data):
return self.f.write(data)
def readShort(self):
value, = unpack(self.shortFmt, self.f.read(2))
return value
def readLong(self):
value, = unpack(self.longFmt, self.f.read(4))
return value
def rewriteLastShortToLong(self, value):
self.f.seek(-2, SEEK_CUR)
bytesWritten = self.f.write(pack(self.longFmt, value))
if bytesWritten != 4:
raise RuntimeError("wrote only %u bytes but wanted 4" %
bytesWritten)
def rewriteLastShort(self, value):
self.f.seek(-2, SEEK_CUR)
bytesWritten = self.f.write(pack(self.shortFmt, value))
if bytesWritten != 2:
raise RuntimeError("wrote only %u bytes but wanted 2" %
bytesWritten)
def rewriteLastLong(self, value):
self.f.seek(-4, SEEK_CUR)
bytesWritten = self.f.write(pack(self.longFmt, value))
if bytesWritten != 4:
raise RuntimeError("wrote only %u bytes but wanted 4" %
bytesWritten)
def writeShort(self, value):
bytesWritten = self.f.write(pack(self.shortFmt, value))
if bytesWritten != 2:
raise RuntimeError("wrote only %u bytes but wanted 2" %
bytesWritten)
def writeLong(self, value):
bytesWritten = self.f.write(pack(self.longFmt, value))
if bytesWritten != 4:
raise RuntimeError("wrote only %u bytes but wanted 4" %
bytesWritten)
def close(self):
if not self.isFirst:
# fix offsets
self.f.seek(self.offsetOfNewPage)
IIMM = self.f.read(4)
if not IIMM:
raise RuntimeError("nothing written into new page")
if IIMM != self.IIMM:
raise RuntimeError("IIMM of new page doesn't match IIMM of "
"first page")
IFDoffset = self.readLong()
IFDoffset += self.offsetOfNewPage
self.f.seek(self.whereToWriteNewIFDOffset)
self.writeLong(IFDoffset)
self.f.seek(IFDoffset)
self.fixIFD()
self.f.close()
def fixIFD(self):
numTags = self.readShort()
trace("fixing IFD at %X; number of tags: %u (0x%X)", self.f.tell()-2,
numTags, numTags)
edit_tags = set(self.Tags)
for i in range(numTags):
tag, fieldType, count = unpack(self.tagFormat, self.f.read(8))
trace(" at %X: tag %u (0x%X), type %u, count %u", self.f.tell()-8,
tag, tag, fieldType, count)
fieldSize = self.fieldSizes[fieldType]
totalSize = fieldSize * count
isLocal = (totalSize <= 4)
if not isLocal:
offset = self.readLong()
offset += self.offsetOfNewPage
self.rewriteLastLong(offset)
if tag in edit_tags:
curPos = self.f.tell()
if isLocal:
self.fixOffsets(count, isShort=(fieldSize == 2),
isLong=(fieldSize == 4))
self.f.seek(curPos + 4)
else:
self.f.seek(offset)
self.fixOffsets(count, isShort=(fieldSize == 2),
isLong=(fieldSize == 4))
self.f.seek(curPos)
offset = curPos = None
elif isLocal:
# skip the locally stored value that is not an offset
self.f.seek(4, SEEK_CUR)
def fixOffsets(self, count, isShort=False, isLong=False):
if not isShort and not isLong:
raise RuntimeError("offset is neither short nor long")
for i in range(count):
offset = self.readShort() if isShort else self.readLong()
offset += self.offsetOfNewPage
if isShort and offset >= 65536:
# offset is now too large - we must convert shorts to longs
if count != 1:
raise RuntimeError("not implemented") # XXX TODO
# simple case - the offset is just one and therefore it is
# local (not referenced with another offset)
self.rewriteLastShortToLong(offset)
self.f.seek(-10, SEEK_CUR)
self.writeShort(4) # rewrite the type to LONG
self.f.seek(8, SEEK_CUR)
elif isShort:
self.rewriteLastShort(offset)
else:
self.rewriteLastLong(offset)
if __name__ == '__main__':
from PIL import Image
import os
testfile = "test.tiff"
if os.path.exists(testfile):
os.unlink(testfile)
with Image.open("multipage.tiff") as ti:
for idx in range(ti.n_frames):
ti.seek(idx)
print ("%i: %s %s" % (idx, repr(ti.size), ti.mode))
with AppendingTiffWriter(testfile) as tf:
ti.save(tf)
@piranna
Copy link

piranna commented Sep 28, 2016

Could you explain how the test works? Seems that's picking the current number of frames and it's adding them, duplicating the number of frames of the file, isn't it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment