-
-
Save lambdafu/fc3ef0863d6b986bc74030f92bb6154f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# AppendingTiffWriter | |
# -*- coding: utf-8 -*- | |
# | |
# "file object wrapper class" that is able to append a TIF to an existing one | |
# | |
# Code by @vashek (https://github.com/vashek) published at | |
# https://github.com/python-pillow/Pillow/issues/733#issuecomment-249380397 | |
# | |
# Fixed PEP8 and clean by Jesús Leganés-Combarro 'piranna' <piranna@gmail.com> | |
from enum import IntEnum | |
from io import open, SEEK_CUR, SEEK_END, SEEK_SET | |
from struct import pack, unpack | |
try: | |
from logging import trace | |
except ImportError as e: | |
def trace(*args, **kwargs): | |
pass | |
class AppendingTiffWriter: | |
fieldSizes = [ | |
0, # None | |
1, # byte | |
1, # ascii | |
2, # short | |
4, # long | |
8, # rational | |
1, # sbyte | |
1, # undefined | |
2, # sshort | |
4, # slong | |
8, # srational | |
4, # float | |
8, # double | |
] | |
class Tags(IntEnum): | |
StripOffsets = 273 | |
FreeOffsets = 288 | |
TileOffsets = 324 | |
JPEGQTables = 519 | |
JPEGDCTables = 520 | |
JPEGACTables = 521 | |
def __init__(self, fn, new=False): | |
self.whereToWriteNewIFDOffset = None | |
self.offsetOfNewPage = 0 | |
self.name = fn | |
mode = "w+b" if new else "r+b" | |
if hasattr(fn, 'read'): | |
self.f = fn | |
else: | |
try: | |
self.f = open(fn, mode) | |
except IOError: | |
self.f = open(fn, "w+b") | |
self.IIMM = IIMM = self.f.read(4) | |
if not IIMM: | |
# empty file - first page | |
self.isFirst = True | |
return | |
self.isFirst = False | |
if IIMM == "II\x2a\x00": | |
self.setEndian("<") | |
elif IIMM == "MM\x00\x2a": | |
self.setEndian(">") | |
else: | |
raise RuntimeError("Invalid TIFF file header") | |
self.skipIFDs() | |
self.goToEnd() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.close() | |
return False | |
def tell(self): | |
return self.f.tell() - self.offsetOfNewPage | |
def seek(self, offset, whence): | |
if whence == SEEK_SET: | |
offset += self.offsetOfNewPage | |
self.f.seek(offset, whence) | |
return self.tell() | |
def goToEnd(self): | |
self.f.seek(0, SEEK_END) | |
pos = self.f.tell() | |
# pad to 16 byte boundary | |
padBytes = 16 - pos % 16 | |
if 0 < padBytes < 16: | |
self.f.write(bytes(bytearray(padBytes))) | |
self.offsetOfNewPage = self.f.tell() | |
def setEndian(self, endian): | |
self.endian = endian | |
self.longFmt = self.endian + "L" | |
self.shortFmt = self.endian + "H" | |
self.tagFormat = self.endian + "HHL" | |
def skipIFDs(self): | |
while True: | |
IFDoffset = self.readLong() | |
if IFDoffset == 0: | |
self.whereToWriteNewIFDOffset = self.f.tell() - 4 | |
break | |
self.f.seek(IFDoffset) | |
numTags = self.readShort() | |
self.f.seek(numTags * 12, SEEK_CUR) | |
def write(self, data): | |
return self.f.write(data) | |
def readShort(self): | |
value, = unpack(self.shortFmt, self.f.read(2)) | |
return value | |
def readLong(self): | |
value, = unpack(self.longFmt, self.f.read(4)) | |
return value | |
def rewriteLastShortToLong(self, value): | |
self.f.seek(-2, SEEK_CUR) | |
bytesWritten = self.f.write(pack(self.longFmt, value)) | |
if bytesWritten != 4: | |
raise RuntimeError("wrote only %u bytes but wanted 4" % | |
bytesWritten) | |
def rewriteLastShort(self, value): | |
self.f.seek(-2, SEEK_CUR) | |
bytesWritten = self.f.write(pack(self.shortFmt, value)) | |
if bytesWritten != 2: | |
raise RuntimeError("wrote only %u bytes but wanted 2" % | |
bytesWritten) | |
def rewriteLastLong(self, value): | |
self.f.seek(-4, SEEK_CUR) | |
bytesWritten = self.f.write(pack(self.longFmt, value)) | |
if bytesWritten != 4: | |
raise RuntimeError("wrote only %u bytes but wanted 4" % | |
bytesWritten) | |
def writeShort(self, value): | |
bytesWritten = self.f.write(pack(self.shortFmt, value)) | |
if bytesWritten != 2: | |
raise RuntimeError("wrote only %u bytes but wanted 2" % | |
bytesWritten) | |
def writeLong(self, value): | |
bytesWritten = self.f.write(pack(self.longFmt, value)) | |
if bytesWritten != 4: | |
raise RuntimeError("wrote only %u bytes but wanted 4" % | |
bytesWritten) | |
def close(self): | |
if not self.isFirst: | |
# fix offsets | |
self.f.seek(self.offsetOfNewPage) | |
IIMM = self.f.read(4) | |
if not IIMM: | |
raise RuntimeError("nothing written into new page") | |
if IIMM != self.IIMM: | |
raise RuntimeError("IIMM of new page doesn't match IIMM of " | |
"first page") | |
IFDoffset = self.readLong() | |
IFDoffset += self.offsetOfNewPage | |
self.f.seek(self.whereToWriteNewIFDOffset) | |
self.writeLong(IFDoffset) | |
self.f.seek(IFDoffset) | |
self.fixIFD() | |
self.f.close() | |
def fixIFD(self): | |
numTags = self.readShort() | |
trace("fixing IFD at %X; number of tags: %u (0x%X)", self.f.tell()-2, | |
numTags, numTags) | |
edit_tags = set(self.Tags) | |
for i in range(numTags): | |
tag, fieldType, count = unpack(self.tagFormat, self.f.read(8)) | |
trace(" at %X: tag %u (0x%X), type %u, count %u", self.f.tell()-8, | |
tag, tag, fieldType, count) | |
fieldSize = self.fieldSizes[fieldType] | |
totalSize = fieldSize * count | |
isLocal = (totalSize <= 4) | |
if not isLocal: | |
offset = self.readLong() | |
offset += self.offsetOfNewPage | |
self.rewriteLastLong(offset) | |
if tag in edit_tags: | |
curPos = self.f.tell() | |
if isLocal: | |
self.fixOffsets(count, isShort=(fieldSize == 2), | |
isLong=(fieldSize == 4)) | |
self.f.seek(curPos + 4) | |
else: | |
self.f.seek(offset) | |
self.fixOffsets(count, isShort=(fieldSize == 2), | |
isLong=(fieldSize == 4)) | |
self.f.seek(curPos) | |
offset = curPos = None | |
elif isLocal: | |
# skip the locally stored value that is not an offset | |
self.f.seek(4, SEEK_CUR) | |
def fixOffsets(self, count, isShort=False, isLong=False): | |
if not isShort and not isLong: | |
raise RuntimeError("offset is neither short nor long") | |
for i in range(count): | |
offset = self.readShort() if isShort else self.readLong() | |
offset += self.offsetOfNewPage | |
if isShort and offset >= 65536: | |
# offset is now too large - we must convert shorts to longs | |
if count != 1: | |
raise RuntimeError("not implemented") # XXX TODO | |
# simple case - the offset is just one and therefore it is | |
# local (not referenced with another offset) | |
self.rewriteLastShortToLong(offset) | |
self.f.seek(-10, SEEK_CUR) | |
self.writeShort(4) # rewrite the type to LONG | |
self.f.seek(8, SEEK_CUR) | |
elif isShort: | |
self.rewriteLastShort(offset) | |
else: | |
self.rewriteLastLong(offset) | |
if __name__ == '__main__': | |
from PIL import Image | |
import os | |
testfile = "test.tiff" | |
if os.path.exists(testfile): | |
os.unlink(testfile) | |
with Image.open("multipage.tiff") as ti: | |
for idx in range(ti.n_frames): | |
ti.seek(idx) | |
print ("%i: %s %s" % (idx, repr(ti.size), ti.mode)) | |
with AppendingTiffWriter(testfile) as tf: | |
ti.save(tf) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Could you explain how the test works? Seems that's picking the current number of frames and it's adding them, duplicating the number of frames of the file, isn't it?