Skip to content

Instantly share code, notes, and snippets.

@alxbl
Last active October 22, 2020 07:52
Show Gist options
  • Save alxbl/5e66dd069c11841fff13ff77c7ec047e to your computer and use it in GitHub Desktop.
Save alxbl/5e66dd069c11841fff13ff77c7ec047e to your computer and use it in GitHub Desktop.
Encoder for TES Arena's MIF file format (type08) records
# This script provides a rudimentary data encoding algorithm for
# type08 records in MIF files.
#
# It's not exactly Huffman-coding or adaptative Huffman coding, but it
# behaves very similar. The developers call it "Delta Mode"
#
# @author alxbl (alex@segfault.me)
#
# https://en.wikipedia.org/wiki/File:Huffman_coding_visualisation.svg
from sys import exit
from io import BytesIO
from struct import unpack, pack
# Valid chunk headers.
HEADERS = ["MAP1", "MAP2", "FLOR"]
# fmt: off
# Lookup tables for history buffer offsets.
offset_hi = [
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03,
0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05,
0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07,
0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x09, 0x09, 0x09, 0x09, 0x09, 0x09, 0x09, 0x09,
0x0A, 0x0A, 0x0A, 0x0A, 0x0A, 0x0A, 0x0A, 0x0A, 0x0B, 0x0B, 0x0B, 0x0B, 0x0B, 0x0B, 0x0B, 0x0B,
0x0C, 0x0C, 0x0C, 0x0C, 0x0D, 0x0D, 0x0D, 0x0D, 0x0E, 0x0E, 0x0E, 0x0E, 0x0F, 0x0F, 0x0F, 0x0F,
0x10, 0x10, 0x10, 0x10, 0x11, 0x11, 0x11, 0x11, 0x12, 0x12, 0x12, 0x12, 0x13, 0x13, 0x13, 0x13,
0x14, 0x14, 0x14, 0x14, 0x15, 0x15, 0x15, 0x15, 0x16, 0x16, 0x16, 0x16, 0x17, 0x17, 0x17, 0x17,
0x18, 0x18, 0x19, 0x19, 0x1A, 0x1A, 0x1B, 0x1B, 0x1C, 0x1C, 0x1D, 0x1D, 0x1E, 0x1E, 0x1F, 0x1F,
0x20, 0x20, 0x21, 0x21, 0x22, 0x22, 0x23, 0x23, 0x24, 0x24, 0x25, 0x25, 0x26, 0x26, 0x27, 0x27,
0x28, 0x28, 0x29, 0x29, 0x2A, 0x2A, 0x2B, 0x2B, 0x2C, 0x2C, 0x2D, 0x2D, 0x2E, 0x2E, 0x2F, 0x2F,
0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, 0x3C, 0x3D, 0x3E, 0x3F
]
offset_lo = [
0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03,
0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03,
0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04,
0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04,
0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04,
0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05,
0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05,
0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05,
0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05,
0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06,
0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06,
0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06,
0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07,
0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07,
0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07,
0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08
]
# fmt: on
class Type08:
"""
Encapsulate the codec state.
"""
def __init__(self):
# Initialize the lookup tree
self.freqs = self._gen_freq()
self.tree = self._gen_tree()
self.indices = self._gen_indices()
self.nfreqs = len(self.freqs)
self.nnodes = len(self.tree)
self.nindices = len(self.indices)
# Initialize blank history.
self.history = bytearray(b"\x20" * 0x1000)
self.hpos = 0
# Bitstream
self.bits = 0
self.avail = 0
def decompress(self, data: BytesIO, rlen: int) -> bytes:
"""
Ported to Python from
https://github.com/afritz1/OpenTESArena/blob/master/OpenTESArena/src/Assets/Compression.h
"""
pos = 0
dst = []
while pos < rlen:
# STEP 1: Read a prefix-code from the bitstream.
node = self.tree[626] # The root of the tree.
# Traverse the prefix tree according to the code bits until a leaf is found.
while node < 627: # 0-626 are internal nodes, anything above 626 is a leaf.
branch = self._read(data, 1)
node = self.tree[node + branch]
code = node - 627
# STEP 2: Process the code retrieved from the prefix tree.
if code <= 0xFF:
# 2.1. Code represents one byte of data.
dst.append(code)
self.history[self.hpos & 0x0FFF] = code
self.hpos += 1
pos += 1
else:
# 2.2. This code encodes a history lookup.
# The next 8 bits encode the history offset and the number of bytes to output.
offset = self._read(data, 8)
hi = offset_hi[offset] << 6
# Read the required number of bits for the low part of the offset.
lo = offset
nbits = offset_lo[offset] - 2
# Only keep 6 lowest bits.
lo = ((lo << nbits) | self._read(data, nbits)) & 0x3F
start = self.hpos - (hi | lo) - 1
nbytes = code - 256 + 3
for i in range(nbytes):
b = self.history[(start + i) & 0x0FFF]
dst.append(b)
self.history[self.hpos & 0x0FFF] = b
self.hpos += 1
pos += 1
# STEP 3: Update the occurence count of the code and rebalance the tree.
self._update(node)
return bytes(dst)
def compress(self, data: bytes) -> bytes:
"""
Compress a raw stream using Type08.
The algorithm is an hybrid between LZ and some form of adaptative
coding. Here's how it works, generally speaking:
The symbol size is 314, symbols 0..255 are for direct byte
mapping. There is a 4096 history buffer where raw bytes are
written to. If a symbol above 255 is encountered, the decoder must
process the symbol along with 8 additional bits of data that
encode an offset in the history table along with a number of bytes
to copy from the history table, similar to LZ runs.
For the adaptative coding, the prefix-tree is updated every after
each symbol is encountered, including history-lookup symbols that
the encoder outputs.
"""
dst = BytesIO()
rlen = len(data)
pos = 0
i = 0
while pos < rlen:
byte = data[pos]
# STEP 1: Find the longest matching run in the history buffer.
start, size = self._scan(data, pos)
if size < 3: # Not enough data for a run.
symbol = byte + 627
size = 1
else:
# Encode the run length in the symbol (256 - 313)
symbol = size - 3 + 256 + 627
# STEP 2: Lookup the prefix code for the symbol and output it.
code, codesz = self._lookup(symbol)
self._write(dst, code, codesz)
# STEP 3. Encode history run offset.
if size > 1:
offset = (self.hpos & 0xFFF) - start - 1
self._write_offset(dst, offset)
# STEP 4: Update history buffer.
for _ in range(size):
self.history[self.hpos & 0x0FFF] = data[pos]
self.hpos += 1
pos += 1
i += 1
# Write any leftover bits that don't make up a byte.
self._flush(dst)
return dst.getvalue()
def _read(self, src: BytesIO, n: int) -> int:
"""
Read a number of bits from a compressed bitstream.
The bitstream is packed left to right.
"""
# Ensure we have enough bits in the work area.
result = 0
while n > 0:
# Ensure that the bit buffer is filled.
while self.avail < 9:
b = src.read(1)
if b:
self.bits |= b[0] << (8 - self.avail)
self.avail += 8
# Consume the next bit and append it to the result.
result <<= 1
result |= (self.bits >> 15) & 1
self.bits = (self.bits << 1) & 0xFFFF
self.avail -= 1
n -= 1
return result
def _write(self, s: BytesIO, byte: int, n: int, ltr=False):
"""
Write bits to the stream.
The bits are left to right if `ltr` is True, otherwise they are read from right to left,
as this is the natural way that `_lookup` will return them.
"""
while n > 0:
# Flush if needed.
if self.avail > 8:
s.write(bytes([self.bits >> 8]))
self.bits = (self.bits << 8) & 0xFFFF
self.avail -= 8
# Append bit to the stream.
if ltr:
bit = (byte >> (n - 1)) & 1
else:
bit = byte & 1
byte >>= 1
self.bits |= bit << (15 - self.avail)
self.avail += 1
n -= 1
def _flush(self, s: BytesIO):
# FIXME: Throw on write after flushing.
while self.avail > 0:
s.write(bytes([self.bits >> 8]))
self.bits = (self.bits << 8) & 0xFFFF
self.avail -= 8
def _write_offset(self, dst: BytesIO, offset: int):
hi = offset & 0xFC0
lo = offset & 0x3F
idx = offset_hi.index(hi >> 6)
used = offset_lo[idx] # Number of bits used to encode the high bit index.
left = 8 - used # Number of bits left in `idx` to encode low bits.
encodable = lo >> (6 - left)
idx += encodable
self._write(dst, idx, 8, ltr=True)
self._write(dst, lo, 6 - left, ltr=True)
def _scan(self, data: bytes, pos: int) -> (int, int):
mstart = 0
msize = 1 # Keep track of optimal result.
dlen = len(data)
# Optimized version: Start from hpos-1 and look for the first byte,
# and try to move as far as possible.
for h in range(self.hpos & 0xFFF, 0, -1):
h -= 1
# Find the latest occurence of the target byte in the history buffer.
if h < 0 or self.history[h] != data[pos]:
continue
# Now move forward, matching as many bytes as possible.
start = h
size = 0
delta = self.hpos - start
i = 0
while (
pos + i < dlen
and data[pos + i] == self.history[(start + (i % delta)) & 0xFFF]
and size < 60
and start + size < (self.hpos & 0xFFF) # Don't go past hpos.
):
i += 1
size += 1
if size > msize:
msize = size
mstart = start
return mstart, msize
def _update(self, node: int):
"""
Increment the frequencies for the given node.
This sorts the tree and must be done after each symbol
is processed to ensure that the prefix-codes are adjusted.
"""
freq_idx = self.indices[node] # Look up node index in the frequency map
while True:
self.freqs[freq_idx] += 1
freq = self.freqs[freq_idx]
next_idx = freq_idx + 1
if next_idx < self.nfreqs and self.freqs[next_idx] < freq:
# Find the first frequency that is less or equal to freq
while next_idx < self.nfreqs and self.freqs[next_idx] < freq:
next_idx += 1
next_idx -= 1
# Swap the current frequency with the one immediately before.
self.freqs[freq_idx] = self.freqs[next_idx]
self.freqs[next_idx] = freq
# Swap the tree's nodes.
tmp = self.tree[next_idx]
self.tree[next_idx] = self.tree[freq_idx]
self.tree[freq_idx] = tmp
# Update index lookup table
idx = self.tree[next_idx]
self.indices[idx] = next_idx
if idx < 627:
self.indices[idx + 1] = next_idx
idx = self.tree[freq_idx]
self.indices[idx] = freq_idx
if idx < 627:
self.indices[idx + 1] = freq_idx
freq_idx = next_idx
# Recurse
freq_idx = self.indices[freq_idx]
if freq_idx == 0:
break
def _lookup(self, symbol: int) -> (int, int):
"""
Lookup the prefix code for a symbol.
The code bits are in reverse order (`87654321`).
:return: (code, code_size)
"""
node = self.tree.index(symbol)
code = 0 # Bits that make up the code.
codesz = 0 # Number of bits that make up the prefix code.
# Traverse the tree from the leaf to the root node,
# building a prefix code along the way.
#
# There is probably a clever way to lookup the parent
# in O(1) that I'm too lazy to figure out.
while node != 626:
# The code size cannot be more than log2(n) = 10 bits (n == 314)
code <<= 1
code |= node & 1
codesz += 1
# Locate the parent node.
node = self.tree.index(node & ~1)
# Update the occurence count for the symbol.
self._update(symbol)
return code, codesz
def _gen_indices(self):
nodes = [(x >> 1) + 314 for x in range(626)]
nodes.append(0) # idx[626]
nodes += [x for x in range(941 - len(nodes))]
return nodes
def _gen_tree(self):
tree = [x + 627 for x in range(314)]
tree += [x * 2 for x in range(627 - len(tree))]
return tree
def _gen_freq(self):
freqs = [1 for _ in range(314)]
n = 0
for i in range(314, 627):
val = freqs[n] + freqs[n + 1]
n += 2
freqs.append(val)
return freqs
def main():
import argparse
p = argparse.ArgumentParser()
p.add_argument("-c", "--compress", type=str, help="The file to compress")
p.add_argument(
"-d", "--decompress", type=str, help="The file to decompress. Also requires -s"
)
p.add_argument(
"-t",
"--type",
type=str,
choices=HEADERS,
default=HEADERS[0],
help="The type of chunk. Must be specified when compressing.",
)
p.add_argument("-o", "--output", help="The destination file", required=True)
args = p.parse_args()
if (args.compress and args.decompress) or (
not args.compress and not args.decompress
):
print("Error: Specify exactly one of --decompress or --compress.")
exit(2)
data = b""
codec = Type08()
if args.compress:
if not args.type:
print("Must specify --type when compressing.")
exit(2)
with open(args.compress, "rb") as infile:
rdata = infile.read()
data = codec.compress(rdata)
print(f"Decompressed Size = {len(rdata):04X} ({len(rdata)})")
print(f"Compressed Size = {len(data):04X} ({len(data)})")
data = (
args.type.encode()
+ pack("<H", len(data) + 2)
+ pack("<H", len(rdata))
+ data
)
elif args.decompress:
with open(args.decompress, "rb") as infile:
zdata = infile.read()
chunk_type = zdata[:4].decode()
if chunk_type not in HEADERS:
print("Invalid Chunk: Must start with one of: " + " ".join(HEADERS))
exit(2)
(clen,) = unpack("<H", zdata[4:6])
(rlen,) = unpack("<H", zdata[6:8])
# fmt: off
zdata = zdata[8:8 + clen - 2]
# fmt: on
data = codec.decompress(BytesIO(zdata), rlen)
print(f"Compressed Size = {len(zdata):04X} ({clen - 2})")
print(f"Decompressed Size = {len(data):04X} ({len(data)})")
print(f"Writing {args.output}")
with open(args.output, "wb") as o:
o.write(data)
print("OK")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment