Skip to content

Instantly share code, notes, and snippets.

@zahlman

zahlman/lz77.py Secret

Created November 30, 2015 03:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zahlman/f362fb7beb3eddfa4924 to your computer and use it in GitHub Desktop.
Save zahlman/f362fb7beb3eddfa4924 to your computer and use it in GitHub Desktop.
Because Discourse corrupts indentation inside code tags, apparently.
from itertools import takewhile, cycle
MAX_MATCH_LENGTH = 18
MAX_LOOKBEHIND = 1 << 12
def length_of_match(data, x, y, limit):
"""The number of matching consecutive characters starting at `x` and `y` in `data`."""
return len(list(takewhile(
lambda x: x[0] == x[1],
zip(data[x:x+limit], data[y:y+limit])
)))
def search(data, start, position, limit):
"""Look for the best previous match in `data` for the data at `position`.
Matches are ranked by greatest length, then by shortest distance away
(i.e., by greatest offset to the match position)."""
candidates = range(
max(start, position - MAX_LOOKBEHIND),
position
)
return max(
(length_of_match(data, previous, position, limit), previous)
for previous in candidates
) if candidates else (0, 0)
def compress_raw(data, start, size):
"""Generator yielding blocks of either a single byte to output literally
when decompressing, or a two-byte code for compression."""
position = start
while position < start + size:
limit = min(MAX_MATCH_LENGTH, start + size - position)
match_length, match_location = search(data, start, position, limit)
if (match_length > 2):
encoded_distance = position - match_location - 1
encoded_length = match_length - 3
assert encoded_length & 0xF == encoded_length
assert encoded_distance & 0xFFF == encoded_distance
yield bytes([
((encoded_length << 4) + ((encoded_distance >> 8) & 0xF)),
encoded_distance & 0xFF
])
position += match_length
else:
yield data[position:position+1]
position += 1
assert position == start + size
def compress_gen(data, start, size):
"""Generator yielding the compressed data, in multiple `bytes` objects."""
# Header
yield bytes([0x10, size & 0xff, (size >> 8) & 0xff, (size >> 16) & 0xff])
output_size = 0
flags = 0
chunk = b''
# Group raw items into chunks of eight preceded by control bytes.
# In the control byte, bits are set according to whether the corresponding
# item is a compression code or a literal byte.
for flag, item in zip(cycle((128, 64, 32, 16, 8, 4, 2, 1)), compress_raw(data, start, size)):
chunk += item
if len(item) == 2: flags |= flag
if flag == 1: # finished a chunk; output control byte and chunk
yield bytes([flags])
yield chunk
output_size += 1 + len(chunk)
flags = 0
chunk = b''
# Emit any leftovers.
if chunk:
yield bytes([flags])
yield chunk
output_size += 1 + len(chunk)
# Handle padding.
yield bytes(-output_size % 4)
def compress(data, start, size):
return b''.join(compress_gen(data, start, size))
def decompress(data, start):
# Determine how much decompressed data to expect.
header = data[start:start+4]
assert header[0] == 0x10
size = (header[3] << 16) | (header[2] << 8) | header[1]
result = bytearray(b'')
position = start + 4
# Main loop.
flags = []
while len(result) < size:
if not flags:
flag_byte = data[position]
position += 1
flags = [bool((flag_byte << i) & 0x80) for i in range(8)]
# Check the next flag and handle it accordingly.
flag, *flags = flags
if flag:
# Interpret a compression code.
first, second = data[position:position+2]
position += 2
match_length = (first >> 4) + 3
encoded_distance = (first & 0xF) << 8 | second
match_location = len(result) - encoded_distance - 1
# Doing indexing math here in order to be able to handle
# the 'wrap-around' behaviour elegantly.
for i in range(match_length):
result.append(result[match_location + i])
else:
# Interpret a literal byte.
result.append(data[position])
position += 1
assert len(result) == size
# Position may be less than len(data) due to padding.
# In general, we won't know how much data to read anyway.
return result
def test(source, compressed, verify):
"""Full demo of functionality."""
junk = b'SPAMSPAMSPAM'
junk_size = len(junk)
chunk_size = 169
with open(source, 'rb') as f:
data = f.read()
with open(compressed, 'wb') as f:
# Compress the file in two chunks by two methods,
# and put some filler before them.
f.write(junk)
first = compress(data, 0, chunk_size)
f.write(first)
f.write(junk)
for item in compress_gen(data, chunk_size, len(data) - chunk_size):
f.write(item)
with open(compressed, 'rb') as f:
data = f.read()
with open(verify, 'wb') as f:
# Restore the original data, by skipping the filler and
# decompressing both chunks.
f.write(decompress(data, junk_size))
f.write(decompress(data, 2 * junk_size + len(first)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment