Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
tcp_reassemble
@classmethod
def tcp_reassemble(cls, data, metadata):
"""[called by sniff(session=TCPSession),
reassembles the tcp stream if packet spans over multiple TCP packets]
Args:
data ([Packet]): [a raw packed strippt by the TCP Layer]
metadata ([dict]): [stores partial streams]
Returns:
[Packet]: [reassembled Packet]
"""
#pylint: disable=unused-argument
if struct.unpack("<I", data[:4])[0] == 1101025874 or (struct.unpack("<I", data[:4])[0] == 0 and struct.unpack("<I", data[4:8])[0] == 0 and struct.unpack("<I", data[8:12])[0] == 0 and struct.unpack("<I", data[12:16])[0] == 0):
length = struct.unpack("<I", data[24:28])[0] # get bundle_len
fragment = PROTOCOL(data)
if fragment.compressed:
# data after header, was deflate compressed
try:
# without 40 bit PROTOCOL bundle header, with 2 bit deflate header
print(str(len(data)) + " ", end='')
inflated = zlib.decompress(
data[42:length], -zlib.MAX_WBITS, length)
# rejoin data with inflated segments omitting the deflate header
data = b"".join([data[:40], inflated])
print(str(len(data)) + "")
except Exception as e:
print("\n" + F"{e}")
return data # void packet if inflate error
if len(data) > length: # got to much
# return PROTOCOL bundle up to bundle_len
pkt = data[:length] # cut of rest
#print( f"### Got MORE actual len: {len(data)} proposed bundle_len: {length} ###")
return PROTOCOL(pkt)
elif len(data) < length: # got less, not working
#print( f"### Got LESS actual len: {len(data)} proposed bundle_len: {length} ###")
return None # push rest back to queue
else:
#print( f"### Got EXAC actual len: {len(data)} proposed bundle_len: {length} ###")
return PROTOCOL(data) # got exactly one bundle in one packet
else:
return data # void packet if not an PROTOCOL bundle
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment