Skip to content

Instantly share code, notes, and snippets.

@arizvisa
Created October 6, 2021 00:30
Show Gist options
  • Save arizvisa/1ec05efb12fe1b10c0096b61c7237f99 to your computer and use it in GitHub Desktop.
Save arizvisa/1ec05efb12fe1b10c0096b61c7237f99 to your computer and use it in GitHub Desktop.
RFC2118 (found in an old home directory)
import ptypes
from ptypes import *
from ptypes.bitmap import RBitmap, WBitmap
import six, array, math
class Encode(object):
@classmethod
def byte(cls, wb, by):
if by & 0x80:
return wb.push(0b10, 2) + wb.push(by & 0x7f, 7)
return wb.push(by, 8)
@classmethod
def offset(cls, wb, offset):
if offset < 0x40:
return wb.push(0b1111, 4) + wb.push(offset - 0x000, 6)
elif offset < 0x140:
return wb.push(0b1110, 4) + wb.push(offset - 0x040, 8)
elif offset < 0x2000:
return wb.push(0b0110, 3) + wb.push(offset - 0x140, 13)
raise ValueError(offset)
@classmethod
def length(cls, wb, length):
if 0x2000 <= length or length < 3:
raise ValueError(length)
# Figure out which bit pattern to encode the length with
for bits in range(1, 1 + 13):
res = 2 ** bits
size, mask = 2 * res, res - 1
pattern = mask ^ 1
if length < size:
break
continue
return wb.push(pattern, bits) + (wb.push(length & mask, bits) if bits > 1 else 0)
@classmethod
def tuple(cls, wb, offset, length):
return cls.offset(wb, offset) + cls.length(wb, length)
class Decode(object):
@classmethod
def byteX(cls, input, prefix):
return (input.consume(7) | 0x80) if prefix else (input.consume(7) & 0x7f)
@classmethod
def offsetX(cls, input, bits):
if bits == 6: # 1111 xxxxxx
return input.consume(6)
elif bits == 8: # 1110 xxxxxxxx
return 0x40 + input.consume(8)
elif bits == 13: # 110 xxxxxxxxxxxxx
return 0x140 + input.consume(13)
raise ValueError(bits)
@classmethod
def lengthX(cls, input, bits):
if 1 > bits or bits > 13:
raise ValueError(bits)
res = 1 if bits == 1 else input.consume(bits)
return res + 2 ** bits
@classmethod
def offset(cls, input):
if input.consume(1):
return cls.offsetX(input, 6) if input.consume(1) else cls.offsetX(input, 8)
return cls.offsetX(input, 13)
@classmethod
def length(cls, input):
for count in range(1, 13):
if not input.consume(1):
break
continue
else:
raise ValueError('Error decoding length (missing terminal bit)')
return cls.lengthX(input, count)
@classmethod
def tuple(cls, input):
offset = cls.offset(input)
length = cls.length(input)
return offset, length
@classmethod
def decode(cls, history, input):
result = array.array('B')
while rb.size() > 0:
if not rb.consume(1): # 0 xxxxxxx
by = rb.consume(7) & 0x7f
result.append(by)
history.push(by)
continue
if not rb.consume(1): # 10 xxxxxxx
by = rb.consume(7) | 0x80
result.append(by)
history.push(by)
continue
offset, length = cls.tuple(rb)
data = history.get(offset, length)
result.extend(data)
history.push(data)
return result
class History(object):
def __init__(self, length=0x2000):
self.offset, self.amount, self.buffer = 0, 0, array.array('B', [0] * length)
def copy(self):
cls = self.__class__
res = cls()
res.offset, res.amount, res.buffer[:] = self.offset, self.amount, self.buffer
return res
def size(self):
return len(self.buffer)
def reset(self):
self.buffer, res = array.array('B', [0] * len(self.buffer)), self.buffer
return res
def __getattr__(self, name):
return getattr(self.buffer, name)
def get(self, offset, length):
index = self.offset - offset
res = self.buffer[index : index + length] # FIXME: Does this need to wrap?
return res.tostring()
def push(self, data):
if isinstance(data, six.string_types):
res = map(six.byte2int, data)
return sum(map(self.push, res))
elif isinstance(data, (tuple, list)) or hasattr(data, '__iter__'):
return sum(map(self.push, data))
elif isinstance(data, six.integer_types):
self.buffer[self.offset] = data
return self.adjust(1)
raise ValueError(data)
def adjust(self, count):
self.amount, self.offset = min((self.amount + count, len(self.buffer))), (self.offset + count) % len(self.buffer)
return count
def find(self, string, offset=0):
res = string if isinstance(string, array.array) else array.array('B', string)
buffer = self.buffer[offset:]
while buffer:
index = buffer.index(res[0])
if buffer[index : index + len(res)] == res:
return offset + index
offset, buffer = offset + index, buffer[index:]
offset, buffer = offset + 1, buffer[1:]
raise ValueError('substring not found')
def find_latest(self, string, offset=0):
res = self.find(string, offset)
try:
while True:
res = self.find(string, 1 + res + offset)
except ValueError:
pass
return res
def recent(self, string, offset=0):
res = (string if isinstance(string, array.array) else array.array('B', string))[::-1]
buffer = self.buffer[:self.offset][::-1]
while buffer:
index = buffer.index(res[0])
if buffer[index : index + len(res)] == res:
return self.offset - (offset + index + len(res))
offset, buffer = offset + index, buffer[index:]
offset, buffer = offset + 1, buffer[1:]
raise ValueError('substring not found')
def longest(self, data):
offset, length = 0, -1
try:
for l in range(3, 1 + len(data)):
index = self.recent(data[:l])
if l > length:
offset, length = index, l
continue
except ValueError:
return () if length < 0 else (offset, length)
return () if length < 0 else (offset, length)
def __repr__(self):
cls, data = self.__class__, self.buffer[:self.offset].tostring()
return '\n'.join(["{!s} length={:#x}".format(cls, len(self.buffer)), ptypes.utils.hexdump(data)])
def compress(history, data):
index, result = 0, WBitmap()
while index < len(data):
res = history.longest(data[index:])
# If a match was found, then we simply need to encode
# the offset and length to our result bitmap.
if res:
offset, length = res
relative = index - offset
Encode.tuple(result, relative % history.size(), length)
res = data[index : index + length]
# If we couldn't find a match for our data, then just encode
# the byte and continue on to the next one
else:
Encode.byte(result, six.byte2int(data[index]))
res = data[index]
index += history.push(res)
return result.serialize()
def decompress(history, data):
input, result = RBitmap(data), array.array('B')
while input.size() >= 0:
# 0 xxxxxxx
if not input.consume(1):
by = Decode.byteX(input, 0b0)
result.append(by)
history.push(by)
continue
# 10 xxxxxxx
if not input.consume(1):
by = Decode.byteX(input, 0b10)
result.append(by)
history.push(by)
continue
# 11 x...
offset, length = Decode.tuple(input)
data = history.get(offset, length)
result.extend(map(six.byte2int, data))
history.push(data)
return result.tostring()
class rfc2118(ptype.encoded_t):
def __init__(self, **attrs):
super(rfc2118, self).__init__(**attrs)
if not hasattr(self, 'history'):
self.history = History()
return
def flush(self):
self.History.reset()
return self
def encode(self, object, **attrs):
cdata = compress(self.history.copy(), object.serialize())
res = ptype.block(length=len(cdata)).set(cdata)
#self._value_ = dyn.block(len(cdata))
#self.blocksize = lambda cb=len(cdata): cb
return super(rfc2118, self).encode(res)
def decode(self, object, **attrs):
data = decompress(self.history.copy(), object.serialize())
res = ptype.block(length=len(data)).set(data)
return super(rfc2118, self).decode(res)
if __name__ == '__main__':
if 'wbitmap-test':
self = WBitmap()
print self.data.tostring().encode('hex')
self.push(0x1, 1)
self.push(0x1, 1)
self.push(0x1, 1)
self.push(0x1, 1)
self.push(0x7, 3)
self.push(0x1, 1)
self.push(0b1000001010000100100001101000100111, 32 + 3)
data = 'a6532994ca6532994ca6532994ca6532994c'.decode('hex')
data = 'a6532994ca6532994ca6532994ca6532994c'
data = '\xcc'*15
data = 'a6532994ca6532994ca6532994ca6532'
data = 'a6532994ca6532994ca6532994ca65324c'
B = ptypes.bitmap
b, self = B.zero, WBitmap()
for by in map(six.byte2int, data):
bits, integer = (9, 0x100 | (by & 0x7f)) if by & 0x80 else (8, by)
self.push(integer, bits)
b = B.push(b, B.new(integer, bits))
print self, bin(self.int())
print B.repr(b), B.string(b)
#x = History()
#x.push('for whom the bell tolls, the bell tolls for thee.')
#found = x.find_latest('whom the bell tolls')
#print x.buffer[found:].tostring().rstrip('\0')
if 'compression-test':
history = History()
res = compress(history, 'for whom the bell tolls, the bell tolls for thee.')
'for whom the bell tolls,<16,15> <40,4><19,3>e.'
print res
#print res.str().encode('hex')
#print bin(res.int())
if 'encode-length-test':
res = WBitmap()
print Encode.length(res, 3)
print res
if 'compression-test-large':
x = History()
data = '\xcc' * 0x2000
res = compress(x, data)
print len(res)
print repr(res)
if 'rbitmap-test':
import sys, random
sys.path.append('.')
import mppc; reload(mppc)
from mppc import *
n = 0x2bb2b2b2abcd3ff2100874666abdef
bn, hn = (F(n)[2:].rstrip('L') for F in (bin, hex))
bn = bn.zfill(len(hn) * 4)
self = RBitmap(hn.decode('hex'))
res, count = 0, 0
while self.size() > 0:
count = min((self.size(), random.randint(1, 32)))
res = self.consume(count)
print count, bn[:count] == bin(res)[2:].zfill(count)
bn = bn[count:]
if 'decompression-table':
print 'byte'
print bin(0)[2:], bin(2**7 - 1)[2:]
print bin(2)[2:], bin(2**7 - 1)[2:]
print 'offset'
print bin(0b1111)[2:], bin(2**6 - 1)[2:]
print bin(0b1110)[2:], bin(2**8 - 1)[2:]
print bin(0b0110)[2:], bin(2**13 - 1)[2:]
print 'length'
for bits in range(1, 13):
res = 2 ** bits
size, mask = 2 * res, res - 1
pattern = mask ^ 1
print bin(pattern)[2:], bin(mask)[2:], bin(size)
if 'decompression-test':
history = History()
res = compress(history, 'for whom the bell tolls, the bell tolls for thee.')
print res
history = History()
res = decompress(history, res)
print res
if 0 and 'decompression-test-huge':
history = History()
data = file('packetlog.txt', 'rt').read()
res = compress(history, data)
history = History()
res = decompress(history, res.serialize())
print data[-16:]
print res.tostring()[-16:]
import sys; sys.path.append('.')
import mppc; reload(mppc)
from mppc import *
if 'ptype-load':
data = 'for whom the bell tolls, the bell tolls for thee.'
history = History()
cdata = compress(history, data)
a = rfc2118(_value_=dyn.block(len(cdata))).a.set(cdata)
if 'ptype-store':
history = History()
data = 'for whom the bell tolls, the bell tolls for thee.'
res = pstr.string(length=len(data)).set(data)
x = rfc2118()
x.reference(res)
#x.d.set('i still fucking love you and miss you, camacho.').c
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment