Created
December 20, 2022 17:31
-
-
Save arlandism/e798a0b6b15fa202b99d566dc03f602b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
MAGIC_NUMBER = bytes.fromhex('a1b2c3d4') | |
REVERSED_MAGIC_NUMBER = bytes.fromhex('d4c3b2a1') | |
def parse_packet_length(raw): | |
return int.from_bytes(raw[8:12], 'little') + 16 # add header size | |
class Packet: | |
def __init__(self, raw): | |
length = int.from_bytes(raw[8:12], 'little') + 16 # add header size | |
untruncated_length = int.from_bytes(raw[12:16], 'little') + 16 # add header size | |
if length != untruncated_length: | |
print("Packet was truncated") | |
self.frame = EthernetFrame(raw[16:]) | |
class EthernetFrame: | |
IPV4_ETHERTYPE = 0x0800 | |
def __init__(self, raw): | |
self.mac_dest = raw[0:6].hex() | |
self.mac_src = raw[6:12].hex() | |
self.ethertype = int.from_bytes(raw[12:14], 'big') | |
assert self.ethertype == self.IPV4_ETHERTYPE, "expected IPV4 ethertype" | |
self.datagram = IPV4Datagram(raw[14:]) | |
class IPV4Datagram: | |
TCP_PROTOCOL = 0x06 | |
def __init__(self, raw): | |
version_mask = 0xf0 | |
internet_header_len_mask = 0x0f | |
self.version = (raw[0] & version_mask) >> 4 | |
self.header_len = (raw[0] & internet_header_len_mask) * 32 // 8 # convert to bytes | |
assert self.header_len <= 20 | |
self.total_len = int.from_bytes(raw[2:4], 'big') | |
assert self.total_len >= 20 and self.total_len <= 65535 | |
self.protocol = raw[9] | |
assert self.protocol == self.TCP_PROTOCOL, "expected tcp protocol" | |
self.src_ip = raw[12:16].hex() | |
self.dest_ip = raw[16:20].hex() | |
assert self.version == 4 | |
self.tcp_segment = TCPSegment(raw[self.header_len:], self.header_len, | |
self.total_len) | |
class TCPSegment: | |
def __init__(self, raw, ip_header_len, ip_datagram_len): | |
self.source_port = int.from_bytes(raw[0:2], 'big') | |
self.dest_port = int.from_bytes(raw[2:4], 'big') | |
self.sequence_num = int.from_bytes(raw[4:8], 'big') | |
self.data_offset = ((raw[12] & 0xf0) >> 4) * 32 // 8 | |
assert self.data_offset >= 20 and self.data_offset <= 60 | |
end_of_data_segment = ip_datagram_len - ip_header_len - self.data_offset | |
#import pdb;pdb.set_trace() | |
self.application_data = raw[self.data_offset:] | |
# padding that I'm missing? | |
def open_pcap_file(): | |
f = open("net.cap", "r+b") | |
return f.read() | |
def is_same_byte_order(data): | |
assert len(header) == 4 | |
return all([data[i] == MAGIC_NUMBER[i] | |
for i in range(len(data))]) | |
def is_reversed_byte_order(header): | |
assert len(header) == 4 | |
return all([header[i] == REVERSED_MAGIC_NUMBER[i] | |
for i in range(len(header))]) | |
mac_addresses = {} | |
data = open_pcap_file() | |
header = data[0:4] | |
if is_same_byte_order(header): | |
print("Same host byte order. Only different is supported.") | |
exit(1) | |
elif is_reversed_byte_order(header): | |
print("Different host byte order") | |
else: | |
print("Weird header format. Aborting") | |
exit(1) | |
start_of_next_packet = 24 | |
packet_count = 0 | |
segments = [] | |
router_ip = None | |
while start_of_next_packet < len(data): | |
packet_len = parse_packet_length(data[start_of_next_packet:]) | |
packet_end = start_of_next_packet + packet_len + 1 | |
packet = Packet(data[start_of_next_packet:packet_end]) | |
if packet_count == 0: | |
router_ip = packet.frame.datagram.src_ip | |
if packet.frame.mac_src not in mac_addresses: | |
mac_addresses[packet.frame.mac_src] = 0 | |
if packet.frame.mac_dest not in mac_addresses: | |
mac_addresses[packet.frame.mac_dest] = 0 | |
mac_addresses[packet.frame.mac_dest] += 1 | |
mac_addresses[packet.frame.mac_src] += 1 | |
if packet.frame.datagram.src_ip != router_ip: | |
segments.append(packet.frame.datagram.tcp_segment) | |
start_of_next_packet += packet_len | |
packet_count += 1 | |
assert len(mac_addresses) == 2, "Expected only 2 mac addresses" | |
ordered_segments = sorted(segments, key=lambda segment: segment.sequence_num) | |
raw_http_data = bytearray() | |
unique_seq_nums = {} | |
for segment in ordered_segments: | |
if segment.sequence_num not in unique_seq_nums: | |
unique_seq_nums[segment.sequence_num] = True | |
raw_http_data += segment.application_data | |
def find_http_response_body(byte_arr): | |
delimiter = [13, 10] * 2 | |
for i in range(len(byte_arr)): | |
if byte_arr[i:i+4] == delimiter: | |
return i+4 | |
def save_image(buffer): | |
with open("hero.jpg", "wb+") as f: | |
f.write(buffer) | |
body_start = find_http_response_body(raw_http_data) | |
save_image(raw_http_data[body_start:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment