Created
October 12, 2021 10:04
-
-
Save jrsmile/77b036189d609bf1633b5e7b9ab969ee to your computer and use it in GitHub Desktop.
tcp_reassemble
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@classmethod | |
def tcp_reassemble(cls, data, metadata): | |
"""[called by sniff(session=TCPSession), | |
reassembles the tcp stream if packet spans over multiple TCP packets] | |
Args: | |
data ([Packet]): [a raw packed strippt by the TCP Layer] | |
metadata ([dict]): [stores partial streams] | |
Returns: | |
[Packet]: [reassembled Packet] | |
""" | |
#pylint: disable=unused-argument | |
if struct.unpack("<I", data[:4])[0] == 1101025874 or (struct.unpack("<I", data[:4])[0] == 0 and struct.unpack("<I", data[4:8])[0] == 0 and struct.unpack("<I", data[8:12])[0] == 0 and struct.unpack("<I", data[12:16])[0] == 0): | |
length = struct.unpack("<I", data[24:28])[0] # get bundle_len | |
fragment = PROTOCOL(data) | |
if fragment.compressed: | |
# data after header, was deflate compressed | |
try: | |
# without 40 bit PROTOCOL bundle header, with 2 bit deflate header | |
print(str(len(data)) + " ", end='') | |
inflated = zlib.decompress( | |
data[42:length], -zlib.MAX_WBITS, length) | |
# rejoin data with inflated segments omitting the deflate header | |
data = b"".join([data[:40], inflated]) | |
print(str(len(data)) + "") | |
except Exception as e: | |
print("\n" + F"{e}") | |
return data # void packet if inflate error | |
if len(data) > length: # got to much | |
# return PROTOCOL bundle up to bundle_len | |
pkt = data[:length] # cut of rest | |
#print( f"### Got MORE actual len: {len(data)} proposed bundle_len: {length} ###") | |
return PROTOCOL(pkt) | |
elif len(data) < length: # got less, not working | |
#print( f"### Got LESS actual len: {len(data)} proposed bundle_len: {length} ###") | |
return None # push rest back to queue | |
else: | |
#print( f"### Got EXAC actual len: {len(data)} proposed bundle_len: {length} ###") | |
return PROTOCOL(data) # got exactly one bundle in one packet | |
else: | |
return data # void packet if not an PROTOCOL bundle |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment