Last active
March 8, 2024 19:52
-
-
Save s3rgeym/70acbfe4b0da8a0790af709640a4e150 to your computer and use it in GitHub Desktop.
Что-то не работает отправка самодельных пакетов из под NATa
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pylint: disable=C,R,W | |
from __future__ import annotations | |
import dataclasses | |
import ipaddress | |
import logging | |
import os | |
import random | |
import socket | |
import struct | |
import uuid | |
from abc import ABC, abstractmethod | |
from dataclasses import dataclass, field | |
from enum import IntEnum, IntFlag, auto | |
from io import BytesIO | |
from operator import methodcaller | |
from struct import Struct | |
from typing import Any, BinaryIO, ClassVar, NamedTuple, Self, Type, TypeVar | |
logger = logging.getLogger(__name__) | |
T = TypeVar("T") | |
class Packable(ABC): | |
@abstractmethod | |
def pack(self) -> bytes: ... | |
@classmethod | |
def unpack(cls: Type[T], buf: BinaryIO) -> T: | |
raise NotImplementedError | |
class Component(Packable): | |
def __add__(self, other: Component | Compound) -> Compound: | |
return Compound([self]) + other | |
@dataclass | |
class Compound(Packable): | |
items: list[Component] = field(default_factory=list) | |
def __getitem__(self, index: int) -> Component: | |
return self.items[index] | |
def __add__(self, item: Component | Compound) -> Self: | |
if isinstance(item, Compound): | |
self.items.extend(item.items) | |
else: | |
self.items.append(item) | |
return self | |
def pack(self) -> bytes: | |
return b"".join(map(methodcaller("pack"), self.items)) | |
# https://www.geeksforgeeks.org/options-field-in-tcp-header/ | |
# https://www.iana.org/assignments/tcp-parameters/tcp-parameters.xhtml | |
# Хоть выше и указано, что длина фиксированная, но это НЕ ТАК! | |
# https://stackoverflow.com/questions/42750552/read-tcp-options-fields | |
class TCPOption(NamedTuple): | |
class Kind(IntEnum): | |
EOL = 0 # end of option list | |
NOP = 1 | |
MSS = 2 | |
WIN_SCALE = 3 | |
SACK_PERMIT = 4 | |
SACK = 5 | |
TS = 8 | |
FAST_OPEN = 34 | |
UNKNOWN = 255 | |
@classmethod | |
def _missing_(cls, value: Any) -> TCPOption.Kind: | |
return cls.UNKNOWN | |
kind: Kind | |
data: bytes | |
@dataclass | |
class TCPHeader(Component): | |
class Flags(IntFlag): | |
FIN = 1 | |
SYN = auto() | |
RST = auto() | |
PSH = auto() | |
ACK = auto() | |
URG = auto() | |
ECE = auto() | |
CWR = auto() | |
src_port: int | |
dst_port: int | |
_: dataclasses.KW_ONLY | |
seq_num: int = 0 | |
ack_num: int = 0 | |
# размер заголовка в 32-битных словах. 5 (20 байт) - минимальный. 15 или 20 (точно не помню) - максимальный | |
data_off: int = 5 | |
reserved: int = 0 | |
flags: Flags = 0 | |
window: int = 32120 | |
check: int = 0 | |
urg_ptr: int = 0 | |
# распарсить эти свойства оказалось сложной задачей... | |
options: list[TCPOption] = field(default_factory=list) | |
struct: ClassVar = Struct("!HHII4H") | |
def __post_init__(self) -> None: | |
self.flags = self.Flags(self.flags) | |
def pack(self) -> bytes: | |
rv = self.struct.pack( | |
self.src_port, | |
self.dst_port, | |
self.seq_num, | |
self.ack_num, | |
((self.data_off & 0xF) << 12) | |
| (self.reserved & 0xF) << 8 | |
| (self.flags & 0xFF), | |
self.window, | |
self.check, | |
self.urg_ptr, | |
) | |
rv += self._pack_options() | |
return rv | |
def _pack_options(self) -> bytes: | |
rv = b"" | |
for option in self.options: | |
assert option[0] > 0 | |
rv += int(option[0]).to_bytes(1) | |
if option[0] == TCPOption.Kind.NOP: | |
continue | |
data = option[1] if len(option) > 1 else b"" | |
# осторожнее с этим. там есть числа, кодированные 4 байтами и тп | |
if isinstance(data, int): | |
data = data.to_bytes() | |
assert isinstance(data, bytes) | |
# длина данных + 2 байта для типа и самой длины | |
rv += (len(data) + 2).to_bytes(1) + data | |
return rv | |
def unpack_options(self, buf: BinaryIO) -> None: | |
if self.data_off == 5: | |
return | |
assert self.data_off > 5 | |
while ( | |
kind := TCPOption.Kind(int.from_bytes(buf.read(1))) | |
) != TCPOption.Kind.EOL: | |
logger.debug("read option: %s", kind.name) | |
data = None | |
if kind != TCPOption.Kind.NOP: | |
length = buf.read(1)[0] | |
assert kind != TCPOption.Kind.TS or ( | |
# Это поле всегда 10 байт | |
kind == TCPOption.Kind.TS | |
and length == 10 | |
) | |
# В длину учитываются: тип (1 байт) и длина (1 байт) | |
data = buf.read(length - 2) | |
self.options.append(TCPOption(kind, data)) | |
@classmethod | |
def unpack(cls: Type[TCPHeader], buf: BinaryIO) -> TCPHeader: | |
values = cls.struct.unpack(buf.read(cls.struct.size)) | |
hdr = cls( | |
src_port=values[0], | |
dst_port=values[1], | |
seq_num=values[2], | |
ack_num=values[3], | |
data_off=(values[4] >> 12) & 0b1111, | |
reserved=(values[4] >> 8) & 0b1111, | |
flags=values[4] & 0xFF, | |
window=values[5], | |
check=values[6], | |
urg_ptr=values[7], | |
) | |
# Некрасиво выглядит, но это я позже накосытлил | |
hdr.unpack_options(buf) | |
return hdr | |
# https://stackoverflow.com/a/66161909/2240578 | |
# https://gist.github.com/david-hoze/0c7021434796997a4ca42d7731a7073a | |
def compute_checksum(self, iph: IPHeader) -> None: | |
s = 0 | |
total_len = socket.ntohs(iph.total_len) - (iph.ihl << 2) | |
src_ip = int(ipaddress.ip_address(iph.src_ip)) | |
s += (src_ip >> 16) & 0xFFFF | |
s += src_ip & 0xFFFF | |
dst_ip = int(ipaddress.ip_address(iph.dst_ip)) | |
s += (dst_ip >> 16) & 0xFFFF | |
s += dst_ip & 0xFFFF | |
s += socket.htons(iph.protocol + total_len) | |
self.check = 0 | |
self.check = checksum(self.pack(), s) | |
@dataclass | |
class IPHeader(Component): | |
class Flags(IntEnum): | |
R = 0 | |
DF = 1 | |
MF = 2 | |
struct: ClassVar = Struct("!BB3HBBHLL") | |
src_ip: str | |
dst_ip: str | |
_: dataclasses.KW_ONLY | |
version: int = 4 | |
ihl: int = 5 # ip header length in 32-bits words. always 5 | |
tos: int = 0 # dscp + ecn | |
# размер пакета без учета Ethernet (14 байт занимает) | |
total_len: int = struct.size + TCPHeader.struct.size | |
ident: int = 0 | |
flags: Flags = 0 | |
frag_off: int = 0 | |
ttl: int = 64 | |
protocol: int = socket.IPPROTO_TCP | |
check: int = 0 | |
def __post_init__(self) -> None: | |
self.flags = self.Flags(self.flags) | |
@classmethod | |
def unpack(cls: type[IPHeader], buf: BinaryIO) -> IPHeader: | |
values = cls.struct.unpack(buf.read(cls.struct.size)) | |
return cls( | |
version=(values[0] >> 4) & 0xF, | |
ihl=values[0] & 0xF, | |
tos=values[1], | |
total_len=values[2], | |
ident=values[3], | |
flags=(values[4] >> 13) & 0b111, | |
frag_off=values[4] & 0b11111_11111_111, | |
ttl=values[5], | |
protocol=values[6], | |
check=values[7], | |
src_ip=str(ipaddress.ip_address(values[8])), | |
dst_ip=str(ipaddress.ip_address(values[9])), | |
) | |
def pack(self) -> bytes: | |
return self.struct.pack( | |
(self.version << 4) | self.ihl, | |
self.tos, | |
self.total_len, | |
self.ident, | |
(self.flags << 13) | self.frag_off, | |
self.ttl, | |
self.protocol, | |
self.check, | |
int(ipaddress.ip_address(self.src_ip)), | |
int(ipaddress.ip_address(self.dst_ip)), | |
) | |
def compute_checksum(self) -> None: | |
self.check = 0 | |
self.check = checksum(self.pack()) | |
class EtherType(IntEnum): | |
IPV4 = 0x800 | |
IPV6 = 0x86DD | |
def pack_mac_addr(s: str) -> bytes: | |
return bytes.fromhex(s.replace(":", "")) | |
def unpack_mac_addr(buf: BinaryIO) -> str: | |
if rv := buf.read(6): | |
assert len(rv) == 6 | |
return rv.hex(":") | |
raise ValueError("nothing to unpack") | |
@dataclass | |
class EtherHeader(Component): | |
dst_mac: str | |
src_mac: str | |
type: EtherType | |
def __post_init__(self) -> None: | |
self.type = EtherType(self.type) | |
def pack(self) -> bytes: | |
return ( | |
pack_mac_addr(self.dst_mac) | |
+ pack_mac_addr(self.src_mac) | |
+ int.to_bytes(self.type, 2) | |
) | |
@classmethod | |
def unpack(cls: Type[EtherHeader], buf: BinaryIO) -> EtherHeader: | |
return cls( | |
dst_mac=unpack_mac_addr(buf), | |
src_mac=unpack_mac_addr(buf), | |
type=int.from_bytes(buf.read(2)), | |
) | |
""" | |
u_short in_cksum(addr, len) | |
u_short *addr; | |
int len; | |
{ | |
int nleft, sum; | |
u_short *w; | |
union { | |
u_short us; | |
u_char uc[2]; | |
} last; | |
u_short answer; | |
nleft = len; | |
sum = 0; | |
w = addr; | |
/* | |
* Our algorithm is simple, using a 32 bit accumulator (sum), we add | |
* sequential 16 bit words to it, and at the end, fold back all the | |
* carry bits from the top 16 bits into the lower 16 bits. | |
*/ | |
while (nleft > 1) { | |
sum += *w++; | |
nleft -= 2; | |
} | |
/* mop up an odd byte, if necessary */ | |
if (nleft == 1) { | |
last.uc[0] = *(u_char *)w; | |
last.uc[1] = 0; | |
sum += last.us; | |
} | |
/* add back carry outs from top 16 bits to low 16 bits */ | |
sum = (sum >> 16) + (sum & 0xffff); /* add hi 16 to low 16 */ | |
sum += (sum >> 16); /* add carry */ | |
answer = ~sum; /* truncate to 16 bits */ | |
return(answer); | |
} | |
TCP/IP ILLUSTRATED: | |
VOL. 2. THE IMPLEMENTATION | |
by Gary R. Wright and W. Richard Stevens | |
""" | |
# https://ru.stackoverflow.com/a/1024519/239593 | |
def checksum(data: bytes, s: int = 0) -> int: | |
# добиваем нулл-байтом, чтобы обработать odd byte | |
s += sum(struct.unpack(f"!{len(data)//2}H", data)) | |
while s > 0xFFFF: | |
s = (s & 0xFFFF) + (s >> 16) | |
return 0xFFFF - s | |
def get_local_ip() -> str: | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: | |
s.connect(("8.8.8.8", 53)) | |
return s.getsockname()[0] | |
def get_free_port() -> int: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
try: | |
s.connect(("", 0)) | |
except ConnectionRefusedError: | |
pass | |
return s.getsockname()[1] | |
def get_device_mac_address() -> str: | |
return uuid.getnode().to_bytes(6).hex(":") | |
# __debug__=False - для игнорирования блоков assert | |
# Запрос к github.com, снятый WireShark: | |
SYN_DATA = bytes.fromhex( | |
"340804c2cf2a74563c2f84ac08004500003c15964000400665aac0a800688c527219c35801bb195cc2ad00000000a0027d78bfaa0000020405b40402080a2ecfe52e0000000001030307" | |
) | |
buf = BytesIO(SYN_DATA) | |
eth = EtherHeader.unpack(buf) | |
# print(f"{eth=}") | |
data = SYN_DATA[:14] | |
p = eth.pack() | |
# print("original :", data.hex(" ")) | |
# print("packed :", p.hex(" ")) | |
assert data == p | |
iph = IPHeader.unpack(buf) | |
# print(f"{iph=}") | |
assert iph.check == 0x65AA | |
assert iph.dst_ip == "140.82.114.25" | |
assert iph.flags == IPHeader.Flags.MF | |
assert iph.protocol == socket.IPPROTO_TCP | |
assert iph.src_ip == "192.168.0.104" | |
assert iph.total_len == 60 | |
assert iph.ttl == 64 | |
assert iph.version == 4 | |
assert iph.ident == 5526 | |
data = SYN_DATA[14:34] | |
p = iph.pack() | |
# print("original :", data.hex(" ")) | |
# print("packed :", p.hex(" ")) | |
assert data == p | |
# при расчете контрольной суммы нужно сбросить поле в 0 | |
# можно и опустить расчеты | |
# check += changed_value - old_value | |
old_value = iph.check | |
iph.compute_checksum() | |
assert old_value == iph.check | |
tcph = TCPHeader.unpack(buf) | |
logger.debug("buffer is empty: %r", buf.tell() == len(SYN_DATA)) | |
assert tcph.ack_num == 0 | |
assert tcph.check == 0xBFAA | |
assert tcph.data_off == 10 | |
assert tcph.dst_port == 443 | |
assert tcph.seq_num == 425509549 | |
assert tcph.src_port == 50008 | |
assert tcph.window == 32120 | |
data = SYN_DATA[34:] | |
p = tcph.pack() | |
# print("original :", data.hex(" ")) | |
# print("packed :", p.hex(" ")) | |
assert data == p | |
old_value = tcph.check | |
tcph.compute_checksum(iph) | |
assert old_value == tcph.check | |
assert (iph + tcph).pack() == SYN_DATA[14:] | |
# Ответ на SYN-запрос: | |
SYN_ACK_DATA = bytes.fromhex( | |
"74563c2f84ac340804c2cf2a08004500003c000040002a0691408c527219c0a8006801bbc358e9e8ede5195cc2aea012ffff524400000204059c0402080a8fe619732ecfe52e0103030a" | |
) | |
buf = BytesIO(SYN_ACK_DATA) | |
buf.seek(14, os.SEEK_CUR) | |
iph2 = IPHeader.unpack(buf) | |
# print(f"{iph2=}") | |
assert iph.src_ip == iph2.dst_ip | |
assert iph.dst_ip == iph2.src_ip | |
tcph2 = TCPHeader.unpack(buf) | |
# print(f"{tcph2=}") | |
assert tcph2.flags == TCPHeader.Flags.SYN | TCPHeader.Flags.ACK | |
data = SYN_ACK_DATA[34:] | |
packed = tcph2.pack() | |
# print("origin :", data.hex(" ")) | |
# print("packed :", packed.hex(" ")) | |
assert tcph2.pack() == data | |
# Все assert'ы выше вынести в тесты | |
import argparse | |
import random | |
import secrets | |
import sys | |
parser = argparse.ArgumentParser(description="Send SYN packet") | |
parser.add_argument("-d", "--debug", action="store_true") | |
parser.add_argument("address") | |
parser.add_argument("port", type=int) | |
args = parser.parse_args() | |
if args.debug: | |
logging.basicConfig() | |
logger.setLevel(logging.DEBUG) | |
logger.info("start") | |
src_ip = get_local_ip() | |
src_port = random.randint(30000, 50000) | |
logger.debug("local ip address : %s", src_ip) | |
logger.debug("local port : %d", src_port) | |
dst_ip = socket.gethostbyname(args.address) | |
dst_port = args.port | |
logger.debug("remote ip address : %s", dst_ip) | |
logger.debug("remote port : %d", dst_port) | |
ip_header = IPHeader( | |
src_ip, | |
dst_ip, | |
total_len=60, | |
# ident=secrets.randbits(16), | |
flags=IPHeader.Flags.MF, | |
) | |
# чексумму подставит роутер,если ее не указать | |
ip_header.compute_checksum() | |
options = [ | |
(TCPOption.Kind.MSS, bytes.fromhex("05b4")), # 1460 | |
(TCPOption.Kind.SACK_PERMIT,), | |
# Не знаю что в TSVal поставить | |
(TCPOption.Kind.TS, secrets.randbits(32).to_bytes(4) + b"\0\0\0\0"), | |
(TCPOption.Kind.NOP,), | |
# (TCPOption.Kind.WIN_SCALE, 7), | |
(TCPOption.Kind.WIN_SCALE, 0), | |
] | |
tcp_header = TCPHeader( | |
src_port, | |
dst_port, | |
seq_num=secrets.randbits(32), | |
flags=TCPHeader.Flags.SYN, | |
data_off=10, # 40 bytes, | |
options=options, | |
) | |
tcp_header.compute_checksum(ip_header) | |
syn_packet = ip_header + tcp_header | |
logger.debug(syn_packet) | |
data = syn_packet.pack() | |
logger.debug("packet data : %s", data.hex(" ")) | |
logger.debug("packet size : %d", len(data)) | |
# assert len(data) == ip_header.total_len | |
try: | |
# socket.IPPROTO_TCP чтобы сниффить | |
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) | |
except PermissionError as ex: | |
raise RuntimeError("root priveleges are required") | |
# просим не добавлять ip заголовки при отправке (для socket.IPPROTO_RAW не нужно) | |
# s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) | |
# ip.dst == 178.248.233.6 and tcp.flags.syn == 1 (www.linux.org) | |
# ip.dst == 85.119.149.3 ? | |
# ip.dst == 93.184.216.34 (example.com) | |
if 0 == (n_bytes := s.sendto(data, (ip_header.dst_ip, 0))): | |
raise RuntimeError("packet was not sent") | |
logger.debug("bytes sent: %d", n_bytes) | |
response, addr = s.recvfrom(65535) | |
logger.debug("response : %s", response.hex(" ")) | |
logger.info("finished") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment