Created
October 6, 2021 00:30
-
-
Save arizvisa/1ec05efb12fe1b10c0096b61c7237f99 to your computer and use it in GitHub Desktop.
RFC2118 (found in an old home directory)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ptypes | |
from ptypes import * | |
from ptypes.bitmap import RBitmap, WBitmap | |
import six, array, math | |
class Encode(object): | |
@classmethod | |
def byte(cls, wb, by): | |
if by & 0x80: | |
return wb.push(0b10, 2) + wb.push(by & 0x7f, 7) | |
return wb.push(by, 8) | |
@classmethod | |
def offset(cls, wb, offset): | |
if offset < 0x40: | |
return wb.push(0b1111, 4) + wb.push(offset - 0x000, 6) | |
elif offset < 0x140: | |
return wb.push(0b1110, 4) + wb.push(offset - 0x040, 8) | |
elif offset < 0x2000: | |
return wb.push(0b0110, 3) + wb.push(offset - 0x140, 13) | |
raise ValueError(offset) | |
@classmethod | |
def length(cls, wb, length): | |
if 0x2000 <= length or length < 3: | |
raise ValueError(length) | |
# Figure out which bit pattern to encode the length with | |
for bits in range(1, 1 + 13): | |
res = 2 ** bits | |
size, mask = 2 * res, res - 1 | |
pattern = mask ^ 1 | |
if length < size: | |
break | |
continue | |
return wb.push(pattern, bits) + (wb.push(length & mask, bits) if bits > 1 else 0) | |
@classmethod | |
def tuple(cls, wb, offset, length): | |
return cls.offset(wb, offset) + cls.length(wb, length) | |
class Decode(object): | |
@classmethod | |
def byteX(cls, input, prefix): | |
return (input.consume(7) | 0x80) if prefix else (input.consume(7) & 0x7f) | |
@classmethod | |
def offsetX(cls, input, bits): | |
if bits == 6: # 1111 xxxxxx | |
return input.consume(6) | |
elif bits == 8: # 1110 xxxxxxxx | |
return 0x40 + input.consume(8) | |
elif bits == 13: # 110 xxxxxxxxxxxxx | |
return 0x140 + input.consume(13) | |
raise ValueError(bits) | |
@classmethod | |
def lengthX(cls, input, bits): | |
if 1 > bits or bits > 13: | |
raise ValueError(bits) | |
res = 1 if bits == 1 else input.consume(bits) | |
return res + 2 ** bits | |
@classmethod | |
def offset(cls, input): | |
if input.consume(1): | |
return cls.offsetX(input, 6) if input.consume(1) else cls.offsetX(input, 8) | |
return cls.offsetX(input, 13) | |
@classmethod | |
def length(cls, input): | |
for count in range(1, 13): | |
if not input.consume(1): | |
break | |
continue | |
else: | |
raise ValueError('Error decoding length (missing terminal bit)') | |
return cls.lengthX(input, count) | |
@classmethod | |
def tuple(cls, input): | |
offset = cls.offset(input) | |
length = cls.length(input) | |
return offset, length | |
@classmethod | |
def decode(cls, history, input): | |
result = array.array('B') | |
while rb.size() > 0: | |
if not rb.consume(1): # 0 xxxxxxx | |
by = rb.consume(7) & 0x7f | |
result.append(by) | |
history.push(by) | |
continue | |
if not rb.consume(1): # 10 xxxxxxx | |
by = rb.consume(7) | 0x80 | |
result.append(by) | |
history.push(by) | |
continue | |
offset, length = cls.tuple(rb) | |
data = history.get(offset, length) | |
result.extend(data) | |
history.push(data) | |
return result | |
class History(object): | |
def __init__(self, length=0x2000): | |
self.offset, self.amount, self.buffer = 0, 0, array.array('B', [0] * length) | |
def copy(self): | |
cls = self.__class__ | |
res = cls() | |
res.offset, res.amount, res.buffer[:] = self.offset, self.amount, self.buffer | |
return res | |
def size(self): | |
return len(self.buffer) | |
def reset(self): | |
self.buffer, res = array.array('B', [0] * len(self.buffer)), self.buffer | |
return res | |
def __getattr__(self, name): | |
return getattr(self.buffer, name) | |
def get(self, offset, length): | |
index = self.offset - offset | |
res = self.buffer[index : index + length] # FIXME: Does this need to wrap? | |
return res.tostring() | |
def push(self, data): | |
if isinstance(data, six.string_types): | |
res = map(six.byte2int, data) | |
return sum(map(self.push, res)) | |
elif isinstance(data, (tuple, list)) or hasattr(data, '__iter__'): | |
return sum(map(self.push, data)) | |
elif isinstance(data, six.integer_types): | |
self.buffer[self.offset] = data | |
return self.adjust(1) | |
raise ValueError(data) | |
def adjust(self, count): | |
self.amount, self.offset = min((self.amount + count, len(self.buffer))), (self.offset + count) % len(self.buffer) | |
return count | |
def find(self, string, offset=0): | |
res = string if isinstance(string, array.array) else array.array('B', string) | |
buffer = self.buffer[offset:] | |
while buffer: | |
index = buffer.index(res[0]) | |
if buffer[index : index + len(res)] == res: | |
return offset + index | |
offset, buffer = offset + index, buffer[index:] | |
offset, buffer = offset + 1, buffer[1:] | |
raise ValueError('substring not found') | |
def find_latest(self, string, offset=0): | |
res = self.find(string, offset) | |
try: | |
while True: | |
res = self.find(string, 1 + res + offset) | |
except ValueError: | |
pass | |
return res | |
def recent(self, string, offset=0): | |
res = (string if isinstance(string, array.array) else array.array('B', string))[::-1] | |
buffer = self.buffer[:self.offset][::-1] | |
while buffer: | |
index = buffer.index(res[0]) | |
if buffer[index : index + len(res)] == res: | |
return self.offset - (offset + index + len(res)) | |
offset, buffer = offset + index, buffer[index:] | |
offset, buffer = offset + 1, buffer[1:] | |
raise ValueError('substring not found') | |
def longest(self, data): | |
offset, length = 0, -1 | |
try: | |
for l in range(3, 1 + len(data)): | |
index = self.recent(data[:l]) | |
if l > length: | |
offset, length = index, l | |
continue | |
except ValueError: | |
return () if length < 0 else (offset, length) | |
return () if length < 0 else (offset, length) | |
def __repr__(self): | |
cls, data = self.__class__, self.buffer[:self.offset].tostring() | |
return '\n'.join(["{!s} length={:#x}".format(cls, len(self.buffer)), ptypes.utils.hexdump(data)]) | |
def compress(history, data): | |
index, result = 0, WBitmap() | |
while index < len(data): | |
res = history.longest(data[index:]) | |
# If a match was found, then we simply need to encode | |
# the offset and length to our result bitmap. | |
if res: | |
offset, length = res | |
relative = index - offset | |
Encode.tuple(result, relative % history.size(), length) | |
res = data[index : index + length] | |
# If we couldn't find a match for our data, then just encode | |
# the byte and continue on to the next one | |
else: | |
Encode.byte(result, six.byte2int(data[index])) | |
res = data[index] | |
index += history.push(res) | |
return result.serialize() | |
def decompress(history, data): | |
input, result = RBitmap(data), array.array('B') | |
while input.size() >= 0: | |
# 0 xxxxxxx | |
if not input.consume(1): | |
by = Decode.byteX(input, 0b0) | |
result.append(by) | |
history.push(by) | |
continue | |
# 10 xxxxxxx | |
if not input.consume(1): | |
by = Decode.byteX(input, 0b10) | |
result.append(by) | |
history.push(by) | |
continue | |
# 11 x... | |
offset, length = Decode.tuple(input) | |
data = history.get(offset, length) | |
result.extend(map(six.byte2int, data)) | |
history.push(data) | |
return result.tostring() | |
class rfc2118(ptype.encoded_t): | |
def __init__(self, **attrs): | |
super(rfc2118, self).__init__(**attrs) | |
if not hasattr(self, 'history'): | |
self.history = History() | |
return | |
def flush(self): | |
self.History.reset() | |
return self | |
def encode(self, object, **attrs): | |
cdata = compress(self.history.copy(), object.serialize()) | |
res = ptype.block(length=len(cdata)).set(cdata) | |
#self._value_ = dyn.block(len(cdata)) | |
#self.blocksize = lambda cb=len(cdata): cb | |
return super(rfc2118, self).encode(res) | |
def decode(self, object, **attrs): | |
data = decompress(self.history.copy(), object.serialize()) | |
res = ptype.block(length=len(data)).set(data) | |
return super(rfc2118, self).decode(res) | |
if __name__ == '__main__': | |
if 'wbitmap-test': | |
self = WBitmap() | |
print self.data.tostring().encode('hex') | |
self.push(0x1, 1) | |
self.push(0x1, 1) | |
self.push(0x1, 1) | |
self.push(0x1, 1) | |
self.push(0x7, 3) | |
self.push(0x1, 1) | |
self.push(0b1000001010000100100001101000100111, 32 + 3) | |
data = 'a6532994ca6532994ca6532994ca6532994c'.decode('hex') | |
data = 'a6532994ca6532994ca6532994ca6532994c' | |
data = '\xcc'*15 | |
data = 'a6532994ca6532994ca6532994ca6532' | |
data = 'a6532994ca6532994ca6532994ca65324c' | |
B = ptypes.bitmap | |
b, self = B.zero, WBitmap() | |
for by in map(six.byte2int, data): | |
bits, integer = (9, 0x100 | (by & 0x7f)) if by & 0x80 else (8, by) | |
self.push(integer, bits) | |
b = B.push(b, B.new(integer, bits)) | |
print self, bin(self.int()) | |
print B.repr(b), B.string(b) | |
#x = History() | |
#x.push('for whom the bell tolls, the bell tolls for thee.') | |
#found = x.find_latest('whom the bell tolls') | |
#print x.buffer[found:].tostring().rstrip('\0') | |
if 'compression-test': | |
history = History() | |
res = compress(history, 'for whom the bell tolls, the bell tolls for thee.') | |
'for whom the bell tolls,<16,15> <40,4><19,3>e.' | |
print res | |
#print res.str().encode('hex') | |
#print bin(res.int()) | |
if 'encode-length-test': | |
res = WBitmap() | |
print Encode.length(res, 3) | |
print res | |
if 'compression-test-large': | |
x = History() | |
data = '\xcc' * 0x2000 | |
res = compress(x, data) | |
print len(res) | |
print repr(res) | |
if 'rbitmap-test': | |
import sys, random | |
sys.path.append('.') | |
import mppc; reload(mppc) | |
from mppc import * | |
n = 0x2bb2b2b2abcd3ff2100874666abdef | |
bn, hn = (F(n)[2:].rstrip('L') for F in (bin, hex)) | |
bn = bn.zfill(len(hn) * 4) | |
self = RBitmap(hn.decode('hex')) | |
res, count = 0, 0 | |
while self.size() > 0: | |
count = min((self.size(), random.randint(1, 32))) | |
res = self.consume(count) | |
print count, bn[:count] == bin(res)[2:].zfill(count) | |
bn = bn[count:] | |
if 'decompression-table': | |
print 'byte' | |
print bin(0)[2:], bin(2**7 - 1)[2:] | |
print bin(2)[2:], bin(2**7 - 1)[2:] | |
print 'offset' | |
print bin(0b1111)[2:], bin(2**6 - 1)[2:] | |
print bin(0b1110)[2:], bin(2**8 - 1)[2:] | |
print bin(0b0110)[2:], bin(2**13 - 1)[2:] | |
print 'length' | |
for bits in range(1, 13): | |
res = 2 ** bits | |
size, mask = 2 * res, res - 1 | |
pattern = mask ^ 1 | |
print bin(pattern)[2:], bin(mask)[2:], bin(size) | |
if 'decompression-test': | |
history = History() | |
res = compress(history, 'for whom the bell tolls, the bell tolls for thee.') | |
print res | |
history = History() | |
res = decompress(history, res) | |
print res | |
if 0 and 'decompression-test-huge': | |
history = History() | |
data = file('packetlog.txt', 'rt').read() | |
res = compress(history, data) | |
history = History() | |
res = decompress(history, res.serialize()) | |
print data[-16:] | |
print res.tostring()[-16:] | |
import sys; sys.path.append('.') | |
import mppc; reload(mppc) | |
from mppc import * | |
if 'ptype-load': | |
data = 'for whom the bell tolls, the bell tolls for thee.' | |
history = History() | |
cdata = compress(history, data) | |
a = rfc2118(_value_=dyn.block(len(cdata))).a.set(cdata) | |
if 'ptype-store': | |
history = History() | |
data = 'for whom the bell tolls, the bell tolls for thee.' | |
res = pstr.string(length=len(data)).set(data) | |
x = rfc2118() | |
x.reference(res) | |
#x.d.set('i still fucking love you and miss you, camacho.').c |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment