Last active
September 27, 2024 17:53
-
-
Save lingeringwillx/9a78841e4e60085c23965e544d58d680 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Pure Python implementation of the Refpack/QFS compression algorithm | |
#uses the first nearest match | |
class CompressionError(Exception): pass | |
class header: | |
RETRO = 0 | |
Y2K = 1 | |
NEW = 2 | |
def copy_bytes(src, src_pos, dst, dst_pos, length): | |
for i in range(length): | |
dst[dst_pos + i] = src[src_pos + i] | |
#compresses the file | |
def compress(src, header_type): | |
src_pos = 0 | |
if header_type == header.RETRO: | |
max_size = 1 << 24 | |
dst_pos = 5 | |
elif header_type == header.Y2K: | |
max_size = 1 << 24 | |
dst_pos = 9 | |
elif header_type == header.NEW: | |
max_size = 1 << 32 | |
if len(src) <= 0xffffff: | |
dst_pos = 5 | |
else: | |
dst_pos = 6 | |
else: | |
raise ValueError('Unknown header type') | |
if len(src) > max_size: | |
raise ValueError('Buffer size is larger than expected') | |
#likely the maximum possible compressed size | |
dst = bytearray(b'\x00' * (dst_pos + len(src) + len(src) // 112 + 2)) | |
table = [-1] * (1 << 17) | |
i = 0 | |
last_match_location = 0 | |
while i <= len(src) - 4: | |
hash_value = (src[i] << 9) ^ (src[i + 1] << 6) ^ (src[i + 2] << 3) + src[i + 3] | |
prev_pos = table[hash_value] | |
if prev_pos != -1: | |
match_position = prev_pos | |
match_offset = i - match_position | |
match_length = 0 | |
left_match_length = 0 | |
if match_offset > 131072: | |
table[hash_value] = i | |
i += 1 | |
continue | |
for j in range(0, len(src) - i): | |
if src[match_position + j] == src[i + j]: | |
match_length += 1 | |
else: | |
break | |
if match_length > 0: | |
for j in range(1, min(match_position - 0, i - last_match_location) + 1): | |
if src[match_position - j] == src[i - j]: | |
left_match_length += 1 | |
else: | |
break | |
match_length += left_match_length | |
if match_length > 1028: | |
remainder = match_length % 1028 | |
if remainder < 5: | |
match_length -= remainder | |
if (match_length >= 3 and match_offset <= 1024) or (match_length >= 4 and match_offset <= 16384) or match_length >= 5: | |
i -= left_match_length | |
literals = i - src_pos | |
while literals > 3: | |
lit = min(literals - literals % 4, 112) | |
if dst_pos + lit + 1 > len(dst): | |
raise CompressionError('Maximum output size exceeded') | |
#111ppp00 | |
dst[dst_pos] = 0b11100000 + ((lit - 4) >> 2) | |
dst_pos += 1 | |
copy_bytes(src, src_pos, dst, dst_pos, lit) | |
src_pos += lit | |
dst_pos += lit | |
literals -= lit | |
cp = match_length | |
offst = match_offset - 1 | |
lit = literals | |
if cp <= 10 and offst < 1024: | |
if dst_pos + 2 > len(dst): | |
raise CompressionError('Maximum output size exceeded') | |
#0oocccpp oooooooo | |
dst[dst_pos] = ((offst >> 3) & 0b01100000) + ((cp - 3) << 2) + lit | |
dst[dst_pos + 1] = offst & 0xff | |
dst_pos += 2 | |
copy_bytes(src, src_pos, dst, dst_pos, lit) | |
src_pos += cp + lit | |
dst_pos += lit | |
elif cp <= 67 and offst < 16384: | |
if dst_pos + 3 > len(dst): | |
raise CompressionError('Maximum output size exceeded') | |
#10cccccc ppoooooo oooooooo | |
dst[dst_pos] = 0b10000000 + (cp - 4) | |
dst[dst_pos + 1] = (lit << 6) + (offst >> 8) | |
dst[dst_pos + 2] = offst & 0xff | |
dst_pos += 3 | |
copy_bytes(src, src_pos, dst, dst_pos, lit) | |
src_pos += cp + lit | |
dst_pos += lit | |
else: | |
while cp >= 5: | |
part = min(cp, 1028) | |
if dst_pos + 4 > len(dst): | |
raise CompressionError('Maximum output size exceeded') | |
#110occpp oooooooo oooooooo cccccccc | |
dst[dst_pos] = 0b11000000 + ((offst >> 12) & 0b00010000) + (((part - 5) >> 6) & 0b00001100) + lit | |
dst[dst_pos + 1] = (offst >> 8) & 0xff | |
dst[dst_pos + 2] = offst & 0xff | |
dst[dst_pos + 3] = (part - 5) & 0xff | |
dst_pos += 4 | |
cp -= part | |
copy_bytes(src, src_pos, dst, dst_pos, lit) | |
src_pos += part + lit | |
dst_pos += lit | |
lit = 0 | |
table[hash_value] = i | |
for j in range(i + left_match_length, min(i + match_length, len(src) - 3)): | |
table[(src[j] << 9) ^ (src[j + 1] << 6) ^ (src[j + 2] << 3) + src[j + 3]] = j | |
i += match_length | |
last_match_location = i | |
else: | |
table[hash_value] = i | |
i += 1 | |
else: | |
table[hash_value] = i | |
i += 1 | |
#copy the remaining bytes at the end | |
literals = len(src) - src_pos | |
while literals > 3: | |
lit = min(literals - literals % 4, 112) | |
if dst_pos + lit + 1 > len(dst): | |
raise CompressionError('Maximum output size exceeded') | |
#111ppp00 | |
dst[dst_pos] = 0b11100000 + ((lit - 4) >> 2) | |
dst_pos += 1 | |
copy_bytes(src, src_pos, dst, dst_pos, lit) | |
src_pos += lit | |
dst_pos += lit | |
literals -= lit | |
lit = literals | |
if literals > 0: | |
if dst_pos + lit + 1 > len(dst): | |
raise CompressionError('Maximum output size exceeded') | |
#111111pp | |
dst[dst_pos] = 0b11111100 + lit | |
dst_pos += 1 | |
copy_bytes(src, src_pos, dst, dst_pos, lit) | |
dst_pos += lit | |
#make the compression header | |
compressed_size = dst_pos | |
uncompressed_size = len(src) | |
if header_type == header.RETRO: | |
dst[0] = 0x10 | |
dst[1] = 0xfb | |
dst[2:5] = uncompressed_size.to_bytes(3, 'big') | |
elif header_type == header.Y2K: | |
dst[0:4] = compressed_size.to_bytes(4, 'little') | |
dst[4] = 0x10 | |
dst[5] = 0xfb | |
dst[6:9] = uncompressed_size.to_bytes(3, 'big') | |
elif header_type == header.NEW: | |
if uncompressed_size <= 0xffffff: | |
dst[0] = 0x10 | |
dst[1] = 0xfb | |
dst[2:5] = uncompressed_size.to_bytes(3, 'big') | |
else: | |
dst[0] = 0x90 | |
dst[1] = 0xfb | |
dst[2:6] = uncompressed_size.to_bytes(4, 'big') | |
dst = dst[:dst_pos] | |
return dst | |
#decompresses the file | |
#raises a CompressionError if decompression fails | |
def decompress(src, header_type): | |
if header_type == header.RETRO: | |
flags = src[0] | |
if flags & 0b00000001: | |
offset = 5 | |
else: | |
offset = 2 | |
uncompressed_size = int.from_bytes(src[offset:offset + 3], 'big') | |
src_pos = offset + 3 | |
elif header_type == header.Y2K: | |
uncompressed_size = int.from_bytes(src[6:9], 'big') | |
src_pos = 9 | |
elif header_type == header.NEW: | |
flags = src[0] | |
if flags & 0b10000000: | |
width = 4 | |
else: | |
width = 3 | |
offset = 2 | |
if flags & 0b00000001: | |
offset += width | |
uncompressed_size = int.from_bytes(src[offset:offset + width], 'big') | |
src_pos = offset + width | |
else: | |
raise ValueError('Unknown header type') | |
dst_pos = 0 | |
dst = bytearray(b'\x00' * uncompressed_size) | |
while src_pos < len(src): | |
b1 = src[src_pos] | |
src_pos += 1 | |
if b1 < 0x80: | |
if src_pos + 1 > len(src): | |
raise CompressionError('Failed to decompress the file') | |
b2 = src[src_pos] | |
src_pos += 1 | |
#0oocccpp oooooooo | |
lit = b1 & 0b00000011 #0-3 | |
cp = ((b1 & 0b00011100) >> 2) + 3 #3-10 | |
offst = ((b1 & 0b01100000) << 3) + b2 + 1 #1-1024 | |
elif b1 < 0xc0: | |
if src_pos + 2 > len(src): | |
raise CompressionError('Failed to decompress the file') | |
b2 = src[src_pos] | |
b3 = src[src_pos + 1] | |
src_pos += 2 | |
#10cccccc ppoooooo oooooooo | |
lit = (b2 & 0b11000000) >> 6 #0-3 | |
cp = (b1 & 0b00111111) + 4 #4-67 | |
offst = ((b2 & 0b00111111) << 8) + b3 + 1 #1-16384 | |
elif b1 < 0xe0: | |
if src_pos + 3 > len(src): | |
raise CompressionError('Failed to decompress the file') | |
b2 = src[src_pos] | |
b3 = src[src_pos + 1] | |
b4 = src[src_pos + 2] | |
src_pos += 3 | |
#110occpp oooooooo oooooooo cccccccc | |
lit = b1 & 0b00000011 #0-3 | |
cp = ((b1 & 0b00001100) << 6) + b4 + 5 #5-1028 | |
offst = ((b1 & 0b00010000) << 12) + (b2 << 8) + b3 + 1 #1-131072 | |
elif b1 < 0xfc: | |
#111ppp00 | |
lit = ((b1 & 0b00011111) << 2) + 4 #4-112 | |
cp = 0 | |
offst = 0 | |
else: | |
#111111pp | |
lit = b1 & 0b00000011 #0-3 | |
cp = 0 | |
offst = 0 | |
if src_pos + lit > len(src) or dst_pos + lit > len(dst): | |
raise CompressionError('Failed to decompress the file') | |
copy_bytes(src, src_pos, dst, dst_pos, lit) | |
src_pos += lit | |
dst_pos += lit | |
if offst > dst_pos or dst_pos + cp > len(dst): | |
raise CompressionError('Failed to decompress the file') | |
#copy bytes from an earlier location in the decompressed output | |
copy_bytes(dst, dst_pos - offst, dst, dst_pos, cp) | |
dst_pos += cp | |
return dst |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment