Skip to content

Instantly share code, notes, and snippets.

@arlandism
Created December 20, 2022 17:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arlandism/e798a0b6b15fa202b99d566dc03f602b to your computer and use it in GitHub Desktop.
Save arlandism/e798a0b6b15fa202b99d566dc03f602b to your computer and use it in GitHub Desktop.
MAGIC_NUMBER = bytes.fromhex('a1b2c3d4')
REVERSED_MAGIC_NUMBER = bytes.fromhex('d4c3b2a1')
def parse_packet_length(raw):
return int.from_bytes(raw[8:12], 'little') + 16 # add header size
class Packet:
def __init__(self, raw):
length = int.from_bytes(raw[8:12], 'little') + 16 # add header size
untruncated_length = int.from_bytes(raw[12:16], 'little') + 16 # add header size
if length != untruncated_length:
print("Packet was truncated")
self.frame = EthernetFrame(raw[16:])
class EthernetFrame:
IPV4_ETHERTYPE = 0x0800
def __init__(self, raw):
self.mac_dest = raw[0:6].hex()
self.mac_src = raw[6:12].hex()
self.ethertype = int.from_bytes(raw[12:14], 'big')
assert self.ethertype == self.IPV4_ETHERTYPE, "expected IPV4 ethertype"
self.datagram = IPV4Datagram(raw[14:])
class IPV4Datagram:
TCP_PROTOCOL = 0x06
def __init__(self, raw):
version_mask = 0xf0
internet_header_len_mask = 0x0f
self.version = (raw[0] & version_mask) >> 4
self.header_len = (raw[0] & internet_header_len_mask) * 32 // 8 # convert to bytes
assert self.header_len <= 20
self.total_len = int.from_bytes(raw[2:4], 'big')
assert self.total_len >= 20 and self.total_len <= 65535
self.protocol = raw[9]
assert self.protocol == self.TCP_PROTOCOL, "expected tcp protocol"
self.src_ip = raw[12:16].hex()
self.dest_ip = raw[16:20].hex()
assert self.version == 4
self.tcp_segment = TCPSegment(raw[self.header_len:], self.header_len,
self.total_len)
class TCPSegment:
def __init__(self, raw, ip_header_len, ip_datagram_len):
self.source_port = int.from_bytes(raw[0:2], 'big')
self.dest_port = int.from_bytes(raw[2:4], 'big')
self.sequence_num = int.from_bytes(raw[4:8], 'big')
self.data_offset = ((raw[12] & 0xf0) >> 4) * 32 // 8
assert self.data_offset >= 20 and self.data_offset <= 60
end_of_data_segment = ip_datagram_len - ip_header_len - self.data_offset
#import pdb;pdb.set_trace()
self.application_data = raw[self.data_offset:]
# padding that I'm missing?
def open_pcap_file():
f = open("net.cap", "r+b")
return f.read()
def is_same_byte_order(data):
assert len(header) == 4
return all([data[i] == MAGIC_NUMBER[i]
for i in range(len(data))])
def is_reversed_byte_order(header):
assert len(header) == 4
return all([header[i] == REVERSED_MAGIC_NUMBER[i]
for i in range(len(header))])
mac_addresses = {}
data = open_pcap_file()
header = data[0:4]
if is_same_byte_order(header):
print("Same host byte order. Only different is supported.")
exit(1)
elif is_reversed_byte_order(header):
print("Different host byte order")
else:
print("Weird header format. Aborting")
exit(1)
start_of_next_packet = 24
packet_count = 0
segments = []
router_ip = None
while start_of_next_packet < len(data):
packet_len = parse_packet_length(data[start_of_next_packet:])
packet_end = start_of_next_packet + packet_len + 1
packet = Packet(data[start_of_next_packet:packet_end])
if packet_count == 0:
router_ip = packet.frame.datagram.src_ip
if packet.frame.mac_src not in mac_addresses:
mac_addresses[packet.frame.mac_src] = 0
if packet.frame.mac_dest not in mac_addresses:
mac_addresses[packet.frame.mac_dest] = 0
mac_addresses[packet.frame.mac_dest] += 1
mac_addresses[packet.frame.mac_src] += 1
if packet.frame.datagram.src_ip != router_ip:
segments.append(packet.frame.datagram.tcp_segment)
start_of_next_packet += packet_len
packet_count += 1
assert len(mac_addresses) == 2, "Expected only 2 mac addresses"
ordered_segments = sorted(segments, key=lambda segment: segment.sequence_num)
raw_http_data = bytearray()
unique_seq_nums = {}
for segment in ordered_segments:
if segment.sequence_num not in unique_seq_nums:
unique_seq_nums[segment.sequence_num] = True
raw_http_data += segment.application_data
def find_http_response_body(byte_arr):
delimiter = [13, 10] * 2
for i in range(len(byte_arr)):
if byte_arr[i:i+4] == delimiter:
return i+4
def save_image(buffer):
with open("hero.jpg", "wb+") as f:
f.write(buffer)
body_start = find_http_response_body(raw_http_data)
save_image(raw_http_data[body_start:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment