Skip to content

Instantly share code, notes, and snippets.

@ArjixWasTaken
Last active June 4, 2022 19:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ArjixWasTaken/10982c4eb2aee3f73e74df7c6f535591 to your computer and use it in GitHub Desktop.
Save ArjixWasTaken/10982c4eb2aee3f73e74df7c6f535591 to your computer and use it in GitHub Desktop.
windows_packet_reader
import struct
import socket
from time import sleep
from pprint import pprint
PACKET_GROUPS: dict[bytes, int] = {}
class Packet:
source_port_number: int
destination_port_number: int
serial_identifier: bytes
acknowledgement_number: int
data_offset: int
reserved: int
control_flags: list[int]
window_size: int
checksum: bytes
urgent_pointer: bytes
optional: bytes
data: bytes
def __init__(self, packet):
self.packet = packet
self.___packet = packet
self.___bits = [
int(x) for x in "".join([
"{:0>8}".format(bin(x)[2:])
for x in packet
])
]
self.source_port_number = self.___read_bytes_as_int(2)
self.destination_port_number = self.___read_bytes_as_int(2)
self.serial_identifier = self.___read_bytes(4)
self.acknowledgement_number = self.___read_bytes_as_int(4)
self.data_offset = self.___read_bits_as_int(4)
self.reserved = self.___read_bits_as_int(3)
self.control_flags = self.___read_bits(9)
self.window_size = self.___read_bytes_as_int(2)
self.checksum = self.___read_bytes(2)
self.urgent_pointer = self.___read_bytes(2)
self.optional = self.___read_bytes(2)
self.data = self.___packet
if self.serial_identifier not in PACKET_GROUPS:
PACKET_GROUPS[self.serial_identifier] = 0
PACKET_GROUPS[self.serial_identifier] += 1
def ___read_bytes(self, length) -> bytes:
buffer = self.___packet[:length]
self.___packet = self.___packet[length:]
[self.___bits.pop(0) for _ in range(length)]
return buffer
def ___read_bytes_as_int(self, length) -> int:
return int.from_bytes(self.___read_bytes(length), byteorder='big')
def ___read_bits(self, length) -> list[int]:
buffer = self.___bits[:length]
[self.___bits.pop(0) for _ in range(length)]
return buffer
def ___read_bits_as_int(self, length):
return int("".join([str(x) for x in self.___read_bits(length)]), 2)
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
s.bind((socket.gethostbyname(socket.gethostname()), 0))
s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
s.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
while True:
try:
data = s.recvfrom(65565)
if struct.unpack('!BBHHHBBHBBBBBBBB', data[0][:20])[6] != 6:
# if not tcp then ignore
# 6 is the protocol number for tcp
continue
packet = Packet(data[0]) # noqa
packet = packet.__dict__
del packet['_Packet___bits']
del packet['_Packet___packet']
pprint(packet)
print('\n\n')
sleep(0.5)
except (KeyboardInterrupt):
print("Stopped listening for packets")
pprint(PACKET_GROUPS)
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment