-
-
Save zahlman/f362fb7beb3eddfa4924 to your computer and use it in GitHub Desktop.
Because Discourse corrupts indentation inside code tags, apparently.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from itertools import takewhile, cycle | |
MAX_MATCH_LENGTH = 18 | |
MAX_LOOKBEHIND = 1 << 12 | |
def length_of_match(data, x, y, limit): | |
"""The number of matching consecutive characters starting at `x` and `y` in `data`.""" | |
return len(list(takewhile( | |
lambda x: x[0] == x[1], | |
zip(data[x:x+limit], data[y:y+limit]) | |
))) | |
def search(data, start, position, limit): | |
"""Look for the best previous match in `data` for the data at `position`. | |
Matches are ranked by greatest length, then by shortest distance away | |
(i.e., by greatest offset to the match position).""" | |
candidates = range( | |
max(start, position - MAX_LOOKBEHIND), | |
position | |
) | |
return max( | |
(length_of_match(data, previous, position, limit), previous) | |
for previous in candidates | |
) if candidates else (0, 0) | |
def compress_raw(data, start, size): | |
"""Generator yielding blocks of either a single byte to output literally | |
when decompressing, or a two-byte code for compression.""" | |
position = start | |
while position < start + size: | |
limit = min(MAX_MATCH_LENGTH, start + size - position) | |
match_length, match_location = search(data, start, position, limit) | |
if (match_length > 2): | |
encoded_distance = position - match_location - 1 | |
encoded_length = match_length - 3 | |
assert encoded_length & 0xF == encoded_length | |
assert encoded_distance & 0xFFF == encoded_distance | |
yield bytes([ | |
((encoded_length << 4) + ((encoded_distance >> 8) & 0xF)), | |
encoded_distance & 0xFF | |
]) | |
position += match_length | |
else: | |
yield data[position:position+1] | |
position += 1 | |
assert position == start + size | |
def compress_gen(data, start, size): | |
"""Generator yielding the compressed data, in multiple `bytes` objects.""" | |
# Header | |
yield bytes([0x10, size & 0xff, (size >> 8) & 0xff, (size >> 16) & 0xff]) | |
output_size = 0 | |
flags = 0 | |
chunk = b'' | |
# Group raw items into chunks of eight preceded by control bytes. | |
# In the control byte, bits are set according to whether the corresponding | |
# item is a compression code or a literal byte. | |
for flag, item in zip(cycle((128, 64, 32, 16, 8, 4, 2, 1)), compress_raw(data, start, size)): | |
chunk += item | |
if len(item) == 2: flags |= flag | |
if flag == 1: # finished a chunk; output control byte and chunk | |
yield bytes([flags]) | |
yield chunk | |
output_size += 1 + len(chunk) | |
flags = 0 | |
chunk = b'' | |
# Emit any leftovers. | |
if chunk: | |
yield bytes([flags]) | |
yield chunk | |
output_size += 1 + len(chunk) | |
# Handle padding. | |
yield bytes(-output_size % 4) | |
def compress(data, start, size): | |
return b''.join(compress_gen(data, start, size)) | |
def decompress(data, start): | |
# Determine how much decompressed data to expect. | |
header = data[start:start+4] | |
assert header[0] == 0x10 | |
size = (header[3] << 16) | (header[2] << 8) | header[1] | |
result = bytearray(b'') | |
position = start + 4 | |
# Main loop. | |
flags = [] | |
while len(result) < size: | |
if not flags: | |
flag_byte = data[position] | |
position += 1 | |
flags = [bool((flag_byte << i) & 0x80) for i in range(8)] | |
# Check the next flag and handle it accordingly. | |
flag, *flags = flags | |
if flag: | |
# Interpret a compression code. | |
first, second = data[position:position+2] | |
position += 2 | |
match_length = (first >> 4) + 3 | |
encoded_distance = (first & 0xF) << 8 | second | |
match_location = len(result) - encoded_distance - 1 | |
# Doing indexing math here in order to be able to handle | |
# the 'wrap-around' behaviour elegantly. | |
for i in range(match_length): | |
result.append(result[match_location + i]) | |
else: | |
# Interpret a literal byte. | |
result.append(data[position]) | |
position += 1 | |
assert len(result) == size | |
# Position may be less than len(data) due to padding. | |
# In general, we won't know how much data to read anyway. | |
return result | |
def test(source, compressed, verify): | |
"""Full demo of functionality.""" | |
junk = b'SPAMSPAMSPAM' | |
junk_size = len(junk) | |
chunk_size = 169 | |
with open(source, 'rb') as f: | |
data = f.read() | |
with open(compressed, 'wb') as f: | |
# Compress the file in two chunks by two methods, | |
# and put some filler before them. | |
f.write(junk) | |
first = compress(data, 0, chunk_size) | |
f.write(first) | |
f.write(junk) | |
for item in compress_gen(data, chunk_size, len(data) - chunk_size): | |
f.write(item) | |
with open(compressed, 'rb') as f: | |
data = f.read() | |
with open(verify, 'wb') as f: | |
# Restore the original data, by skipping the filler and | |
# decompressing both chunks. | |
f.write(decompress(data, junk_size)) | |
f.write(decompress(data, 2 * junk_size + len(first))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment