Skip to content

Instantly share code, notes, and snippets.

@lingeringwillx
Last active September 27, 2024 17:53
Show Gist options
  • Save lingeringwillx/9a78841e4e60085c23965e544d58d680 to your computer and use it in GitHub Desktop.
Save lingeringwillx/9a78841e4e60085c23965e544d58d680 to your computer and use it in GitHub Desktop.
#Pure Python implementation of the Refpack/QFS compression algorithm
#uses the first nearest match
class CompressionError(Exception): pass
class header:
RETRO = 0
Y2K = 1
NEW = 2
def copy_bytes(src, src_pos, dst, dst_pos, length):
for i in range(length):
dst[dst_pos + i] = src[src_pos + i]
#compresses the file
def compress(src, header_type):
src_pos = 0
if header_type == header.RETRO:
max_size = 1 << 24
dst_pos = 5
elif header_type == header.Y2K:
max_size = 1 << 24
dst_pos = 9
elif header_type == header.NEW:
max_size = 1 << 32
if len(src) <= 0xffffff:
dst_pos = 5
else:
dst_pos = 6
else:
raise ValueError('Unknown header type')
if len(src) > max_size:
raise ValueError('Buffer size is larger than expected')
#likely the maximum possible compressed size
dst = bytearray(b'\x00' * (dst_pos + len(src) + len(src) // 112 + 2))
table = [-1] * (1 << 17)
i = 0
last_match_location = 0
while i <= len(src) - 4:
hash_value = (src[i] << 9) ^ (src[i + 1] << 6) ^ (src[i + 2] << 3) + src[i + 3]
prev_pos = table[hash_value]
if prev_pos != -1:
match_position = prev_pos
match_offset = i - match_position
match_length = 0
left_match_length = 0
if match_offset > 131072:
table[hash_value] = i
i += 1
continue
for j in range(0, len(src) - i):
if src[match_position + j] == src[i + j]:
match_length += 1
else:
break
if match_length > 0:
for j in range(1, min(match_position - 0, i - last_match_location) + 1):
if src[match_position - j] == src[i - j]:
left_match_length += 1
else:
break
match_length += left_match_length
if match_length > 1028:
remainder = match_length % 1028
if remainder < 5:
match_length -= remainder
if (match_length >= 3 and match_offset <= 1024) or (match_length >= 4 and match_offset <= 16384) or match_length >= 5:
i -= left_match_length
literals = i - src_pos
while literals > 3:
lit = min(literals - literals % 4, 112)
if dst_pos + lit + 1 > len(dst):
raise CompressionError('Maximum output size exceeded')
#111ppp00
dst[dst_pos] = 0b11100000 + ((lit - 4) >> 2)
dst_pos += 1
copy_bytes(src, src_pos, dst, dst_pos, lit)
src_pos += lit
dst_pos += lit
literals -= lit
cp = match_length
offst = match_offset - 1
lit = literals
if cp <= 10 and offst < 1024:
if dst_pos + 2 > len(dst):
raise CompressionError('Maximum output size exceeded')
#0oocccpp oooooooo
dst[dst_pos] = ((offst >> 3) & 0b01100000) + ((cp - 3) << 2) + lit
dst[dst_pos + 1] = offst & 0xff
dst_pos += 2
copy_bytes(src, src_pos, dst, dst_pos, lit)
src_pos += cp + lit
dst_pos += lit
elif cp <= 67 and offst < 16384:
if dst_pos + 3 > len(dst):
raise CompressionError('Maximum output size exceeded')
#10cccccc ppoooooo oooooooo
dst[dst_pos] = 0b10000000 + (cp - 4)
dst[dst_pos + 1] = (lit << 6) + (offst >> 8)
dst[dst_pos + 2] = offst & 0xff
dst_pos += 3
copy_bytes(src, src_pos, dst, dst_pos, lit)
src_pos += cp + lit
dst_pos += lit
else:
while cp >= 5:
part = min(cp, 1028)
if dst_pos + 4 > len(dst):
raise CompressionError('Maximum output size exceeded')
#110occpp oooooooo oooooooo cccccccc
dst[dst_pos] = 0b11000000 + ((offst >> 12) & 0b00010000) + (((part - 5) >> 6) & 0b00001100) + lit
dst[dst_pos + 1] = (offst >> 8) & 0xff
dst[dst_pos + 2] = offst & 0xff
dst[dst_pos + 3] = (part - 5) & 0xff
dst_pos += 4
cp -= part
copy_bytes(src, src_pos, dst, dst_pos, lit)
src_pos += part + lit
dst_pos += lit
lit = 0
table[hash_value] = i
for j in range(i + left_match_length, min(i + match_length, len(src) - 3)):
table[(src[j] << 9) ^ (src[j + 1] << 6) ^ (src[j + 2] << 3) + src[j + 3]] = j
i += match_length
last_match_location = i
else:
table[hash_value] = i
i += 1
else:
table[hash_value] = i
i += 1
#copy the remaining bytes at the end
literals = len(src) - src_pos
while literals > 3:
lit = min(literals - literals % 4, 112)
if dst_pos + lit + 1 > len(dst):
raise CompressionError('Maximum output size exceeded')
#111ppp00
dst[dst_pos] = 0b11100000 + ((lit - 4) >> 2)
dst_pos += 1
copy_bytes(src, src_pos, dst, dst_pos, lit)
src_pos += lit
dst_pos += lit
literals -= lit
lit = literals
if literals > 0:
if dst_pos + lit + 1 > len(dst):
raise CompressionError('Maximum output size exceeded')
#111111pp
dst[dst_pos] = 0b11111100 + lit
dst_pos += 1
copy_bytes(src, src_pos, dst, dst_pos, lit)
dst_pos += lit
#make the compression header
compressed_size = dst_pos
uncompressed_size = len(src)
if header_type == header.RETRO:
dst[0] = 0x10
dst[1] = 0xfb
dst[2:5] = uncompressed_size.to_bytes(3, 'big')
elif header_type == header.Y2K:
dst[0:4] = compressed_size.to_bytes(4, 'little')
dst[4] = 0x10
dst[5] = 0xfb
dst[6:9] = uncompressed_size.to_bytes(3, 'big')
elif header_type == header.NEW:
if uncompressed_size <= 0xffffff:
dst[0] = 0x10
dst[1] = 0xfb
dst[2:5] = uncompressed_size.to_bytes(3, 'big')
else:
dst[0] = 0x90
dst[1] = 0xfb
dst[2:6] = uncompressed_size.to_bytes(4, 'big')
dst = dst[:dst_pos]
return dst
#decompresses the file
#raises a CompressionError if decompression fails
def decompress(src, header_type):
if header_type == header.RETRO:
flags = src[0]
if flags & 0b00000001:
offset = 5
else:
offset = 2
uncompressed_size = int.from_bytes(src[offset:offset + 3], 'big')
src_pos = offset + 3
elif header_type == header.Y2K:
uncompressed_size = int.from_bytes(src[6:9], 'big')
src_pos = 9
elif header_type == header.NEW:
flags = src[0]
if flags & 0b10000000:
width = 4
else:
width = 3
offset = 2
if flags & 0b00000001:
offset += width
uncompressed_size = int.from_bytes(src[offset:offset + width], 'big')
src_pos = offset + width
else:
raise ValueError('Unknown header type')
dst_pos = 0
dst = bytearray(b'\x00' * uncompressed_size)
while src_pos < len(src):
b1 = src[src_pos]
src_pos += 1
if b1 < 0x80:
if src_pos + 1 > len(src):
raise CompressionError('Failed to decompress the file')
b2 = src[src_pos]
src_pos += 1
#0oocccpp oooooooo
lit = b1 & 0b00000011 #0-3
cp = ((b1 & 0b00011100) >> 2) + 3 #3-10
offst = ((b1 & 0b01100000) << 3) + b2 + 1 #1-1024
elif b1 < 0xc0:
if src_pos + 2 > len(src):
raise CompressionError('Failed to decompress the file')
b2 = src[src_pos]
b3 = src[src_pos + 1]
src_pos += 2
#10cccccc ppoooooo oooooooo
lit = (b2 & 0b11000000) >> 6 #0-3
cp = (b1 & 0b00111111) + 4 #4-67
offst = ((b2 & 0b00111111) << 8) + b3 + 1 #1-16384
elif b1 < 0xe0:
if src_pos + 3 > len(src):
raise CompressionError('Failed to decompress the file')
b2 = src[src_pos]
b3 = src[src_pos + 1]
b4 = src[src_pos + 2]
src_pos += 3
#110occpp oooooooo oooooooo cccccccc
lit = b1 & 0b00000011 #0-3
cp = ((b1 & 0b00001100) << 6) + b4 + 5 #5-1028
offst = ((b1 & 0b00010000) << 12) + (b2 << 8) + b3 + 1 #1-131072
elif b1 < 0xfc:
#111ppp00
lit = ((b1 & 0b00011111) << 2) + 4 #4-112
cp = 0
offst = 0
else:
#111111pp
lit = b1 & 0b00000011 #0-3
cp = 0
offst = 0
if src_pos + lit > len(src) or dst_pos + lit > len(dst):
raise CompressionError('Failed to decompress the file')
copy_bytes(src, src_pos, dst, dst_pos, lit)
src_pos += lit
dst_pos += lit
if offst > dst_pos or dst_pos + cp > len(dst):
raise CompressionError('Failed to decompress the file')
#copy bytes from an earlier location in the decompressed output
copy_bytes(dst, dst_pos - offst, dst, dst_pos, cp)
dst_pos += cp
return dst
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment