Last active
December 28, 2017 12:08
-
-
Save mgd020/663466a8a615100bd373a0382d6e2547 to your computer and use it in GitHub Desktop.
Extracts the map image from Quick Chart (.QCT) files into PNG
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# REQUIRES pillow-simd or pillow | |
from __future__ import absolute_import, division, print_function, unicode_literals | |
# import cProfile | |
# import cStringIO | |
import datetime | |
import itertools | |
import os | |
# import pstats | |
import struct | |
import sys | |
from PIL import Image | |
structs = {} | |
def unpack(stream, format): | |
global structs | |
try: | |
s = structs[format] | |
except KeyError: | |
structs[format] = s = struct.Struct(format) | |
f = stream.read(s.size) | |
assert f, 'unexpected eof' | |
return s.unpack(f)[0] | |
def grouper(iterable, n, fillvalue=None): | |
"Collect data into fixed-length chunks or blocks." | |
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" | |
args = [iter(iterable)] * n | |
return itertools.izip_longest(*args, fillvalue=fillvalue) | |
def integer(f): | |
return unpack(f, b'<I') | |
def float64(f): | |
return unpack(f, b'd') | |
def byte(f): | |
return unpack(f, b'B') | |
def short(f): | |
return unpack(f, b'<H') | |
def string(f): | |
chars = iter(lambda: f.read(1), b'') | |
return b''.join(itertools.takewhile(b'\0'.__ne__, chars)) | |
def pointer(f, value): | |
ptr = integer(f) | |
offset = f.tell() | |
f.seek(ptr) | |
try: | |
return value(f) | |
finally: | |
f.seek(offset) | |
def date_time(f): | |
return datetime.datetime.fromtimestamp(integer(f)) | |
def array(f, size, value): | |
if callable(size): | |
size = size(f) | |
return [ | |
value(f) | |
for _ in xrange(size) | |
] | |
def datum_shift(f): | |
return { | |
'north': float64(f), | |
'east': float64(f), | |
} | |
def licence_information(f): | |
data = { | |
'identifier': integer(f), | |
} | |
f.seek(8, 1) | |
data.update({ | |
'license_description': pointer(f, string), | |
'serial_number': array(f, 32, byte), | |
}) | |
f.seek(84, 1) | |
return data | |
def digital_map_shop(f): | |
return { | |
'size': integer(f), | |
'url': pointer(f, string), | |
} | |
def extended_data_structure(f): | |
data = { | |
'map_type': pointer(f, string), | |
'datum_shift': pointer(f, datum_shift), | |
'disk_name': pointer(f, string), | |
} | |
f.seek(8, 1) | |
data.update({ | |
'license_information': pointer(f, licence_information), | |
'associated_data': pointer(f, string), | |
'digital_map_shop': pointer(f, digital_map_shop), | |
}) | |
return data | |
def map_outline_point(f): | |
return { | |
'latitude': float64(f), | |
'longitude': float64(f), | |
} | |
def meta_data(f): | |
data = { | |
'magic': integer(f), | |
'version': integer(f), | |
'width': integer(f), | |
'height': integer(f), | |
'long_title': pointer(f, string), | |
'name': pointer(f, string), | |
'identifier': pointer(f, string), | |
'edition': pointer(f, string), | |
'revision': pointer(f, string), | |
'keywords': pointer(f, string), | |
'copyright': pointer(f, string), | |
'scale': pointer(f, string), | |
'datum': pointer(f, string), | |
'depths': pointer(f, string), | |
'heights': pointer(f, string), | |
'projection': pointer(f, string), | |
'bit_field': integer(f), | |
'original_file_name': pointer(f, string), | |
'original_file_size': integer(f), | |
'original_file_creation_time': date_time(f), | |
} | |
f.seek(4, 1) | |
data.update({ | |
'extended_data_struct': pointer(f, extended_data_structure), | |
'map_outline_points': integer(f), | |
}) | |
data['map_outline'] = pointer(f, lambda f_: array(f_, data['map_outline_points'], map_outline_point)) | |
return data | |
def deinterlace(y): | |
# flip the 6 wide bit pattern (110100 -> 001011) | |
return int(''.join(reversed('{:06b}'.format(y))), 2) | |
TILE_SIZE = 64 | |
def draw_image(f, data, drawing, x, y): | |
start = f.tell() | |
b0 = byte(f) | |
try: | |
if b0 in (0, 0xff): | |
pixels = decode_huffman(f) | |
elif b0 > 127: | |
pixels = decode_packed(f) | |
else: | |
pixels = decode_rle(f) | |
for row in xrange(TILE_SIZE): | |
row = deinterlace(row) + y | |
for col in xrange(x, x + TILE_SIZE): | |
drawing[col, row] = pixels.next() | |
except Exception as e: | |
print('error drawing tile', start, ':', e) | |
import traceback | |
traceback.print_exc() | |
def decode_rle(f): | |
f.seek(-1, 1) | |
sub_palette_len = byte(f) | |
sub_palette = array(f, sub_palette_len, byte) | |
assert sub_palette_len | |
sub_palette_len -= 1 | |
repeat_shift = 0 | |
while sub_palette_len: | |
sub_palette_len >>= 1 | |
repeat_shift += 1 | |
sub_palette_mask = (1 << repeat_shift) - 1 | |
while True: | |
b = byte(f) | |
color = sub_palette[b & sub_palette_mask] | |
repeat = b >> repeat_shift | |
for _ in xrange(repeat): | |
yield color | |
def iter_bits(f): | |
while True: | |
b = byte(f) | |
for _ in xrange(8): | |
if b: | |
yield b & 1 | |
b >>= 1 | |
else: | |
yield 0 | |
def decode_code_book(f): | |
code_book = [] | |
branches = 0 | |
colors = 0 | |
while colors <= branches: | |
b = byte(f) | |
code_book.append(b) | |
if b == 128: | |
code_book.append(65539 - short(f)) | |
code_book.append(None) | |
branches += 1 | |
elif b < 128: | |
colors += 1 | |
else: | |
branches += 1 | |
return code_book | |
def decode_huffman(f): | |
code_book = decode_code_book(f) | |
if len(code_book) == 1: | |
color = code_book[0] | |
while True: | |
yield color | |
bits = iter_bits(f) | |
while True: | |
p = 0 | |
while True: | |
code = code_book[p] | |
if code < 128: | |
break | |
if bits.next(): | |
if code == 128: | |
p += code_book[p + 1] | |
else: | |
p += 257 - code | |
else: | |
if code == 128: | |
p += 3 | |
else: | |
p += 1 | |
yield code | |
def decode_packed(f): | |
f.seek(-1, 1) | |
sub_palette_len = byte(f) | |
sub_palette = array(f, sub_palette_len, byte) | |
assert sub_palette_len | |
sub_palette_len -= 1 | |
next_shift = 0 | |
while sub_palette_len: | |
sub_palette_len >>= 1 | |
next_shift += 1 | |
sub_palette_mask = (1 << next_shift) - 1 | |
total_per_int = 32 // next_shift | |
while True: | |
i = integer(f) | |
for _ in xrange(total_per_int): | |
yield sub_palette[i & sub_palette_mask] | |
i >>= next_shift | |
def qct(f, filename): | |
data = { | |
# meta data | |
'meta_data': meta_data(f), | |
# geographical referencing coefficients | |
'geo_refs': array(f, 40, float64), | |
# palette | |
'palette': [(r, g, b) for b, g, r, _ in grouper(array(f, 1024, byte)[:512], 4)], | |
# interpolation matrix | |
'interp_mat': array(f, 16384, byte), | |
} | |
width = data['meta_data']['width'] | |
height = data['meta_data']['height'] | |
# image index pointers | |
# TODO: check if image index is in qc3 file | |
data['image_index'] = array(f, width * height, integer) | |
# print(len(data['image_index'])) | |
# profile = cProfile.Profile() | |
# profile.enable() | |
# image | |
image = Image.new('P', (width * TILE_SIZE, height * TILE_SIZE), None) | |
image.putpalette(sum(data['palette'], ())) | |
drawing = image.load() | |
for y in xrange(height): | |
for x in xrange(width): | |
image_index = data['image_index'][(width * y) + x] | |
if image_index: | |
f.seek(image_index) | |
draw_image(f, data, drawing, x * TILE_SIZE, y * TILE_SIZE) | |
image.save(filename) | |
# profile.disable() | |
# stream = cStringIO.StringIO() | |
# stats = pstats.Stats(profile, stream=stream).sort_stats('cumulative') | |
# stats.print_stats() | |
# print(stream.getvalue()) | |
# file body | |
return data | |
if __name__ == '__main__': | |
name = os.path.splitext(os.path.basename(sys.argv[1]))[0] | |
with open(sys.argv[1], 'rb') as f: | |
qct(f, name + '.png') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment