Skip to content

Instantly share code, notes, and snippets.

@s3rgeym
Last active March 8, 2024 19:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s3rgeym/70acbfe4b0da8a0790af709640a4e150 to your computer and use it in GitHub Desktop.
Save s3rgeym/70acbfe4b0da8a0790af709640a4e150 to your computer and use it in GitHub Desktop.
Что-то не работает отправка самодельных пакетов из под NATa
# pylint: disable=C,R,W
from __future__ import annotations
import dataclasses
import ipaddress
import logging
import os
import random
import socket
import struct
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import IntEnum, IntFlag, auto
from io import BytesIO
from operator import methodcaller
from struct import Struct
from typing import Any, BinaryIO, ClassVar, NamedTuple, Self, Type, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
class Packable(ABC):
@abstractmethod
def pack(self) -> bytes: ...
@classmethod
def unpack(cls: Type[T], buf: BinaryIO) -> T:
raise NotImplementedError
class Component(Packable):
def __add__(self, other: Component | Compound) -> Compound:
return Compound([self]) + other
@dataclass
class Compound(Packable):
items: list[Component] = field(default_factory=list)
def __getitem__(self, index: int) -> Component:
return self.items[index]
def __add__(self, item: Component | Compound) -> Self:
if isinstance(item, Compound):
self.items.extend(item.items)
else:
self.items.append(item)
return self
def pack(self) -> bytes:
return b"".join(map(methodcaller("pack"), self.items))
# https://www.geeksforgeeks.org/options-field-in-tcp-header/
# https://www.iana.org/assignments/tcp-parameters/tcp-parameters.xhtml
# Хоть выше и указано, что длина фиксированная, но это НЕ ТАК!
# https://stackoverflow.com/questions/42750552/read-tcp-options-fields
class TCPOption(NamedTuple):
class Kind(IntEnum):
EOL = 0 # end of option list
NOP = 1
MSS = 2
WIN_SCALE = 3
SACK_PERMIT = 4
SACK = 5
TS = 8
FAST_OPEN = 34
UNKNOWN = 255
@classmethod
def _missing_(cls, value: Any) -> TCPOption.Kind:
return cls.UNKNOWN
kind: Kind
data: bytes
@dataclass
class TCPHeader(Component):
class Flags(IntFlag):
FIN = 1
SYN = auto()
RST = auto()
PSH = auto()
ACK = auto()
URG = auto()
ECE = auto()
CWR = auto()
src_port: int
dst_port: int
_: dataclasses.KW_ONLY
seq_num: int = 0
ack_num: int = 0
# размер заголовка в 32-битных словах. 5 (20 байт) - минимальный. 15 или 20 (точно не помню) - максимальный
data_off: int = 5
reserved: int = 0
flags: Flags = 0
window: int = 32120
check: int = 0
urg_ptr: int = 0
# распарсить эти свойства оказалось сложной задачей...
options: list[TCPOption] = field(default_factory=list)
struct: ClassVar = Struct("!HHII4H")
def __post_init__(self) -> None:
self.flags = self.Flags(self.flags)
def pack(self) -> bytes:
rv = self.struct.pack(
self.src_port,
self.dst_port,
self.seq_num,
self.ack_num,
((self.data_off & 0xF) << 12)
| (self.reserved & 0xF) << 8
| (self.flags & 0xFF),
self.window,
self.check,
self.urg_ptr,
)
rv += self._pack_options()
return rv
def _pack_options(self) -> bytes:
rv = b""
for option in self.options:
assert option[0] > 0
rv += int(option[0]).to_bytes(1)
if option[0] == TCPOption.Kind.NOP:
continue
data = option[1] if len(option) > 1 else b""
# осторожнее с этим. там есть числа, кодированные 4 байтами и тп
if isinstance(data, int):
data = data.to_bytes()
assert isinstance(data, bytes)
# длина данных + 2 байта для типа и самой длины
rv += (len(data) + 2).to_bytes(1) + data
return rv
def unpack_options(self, buf: BinaryIO) -> None:
if self.data_off == 5:
return
assert self.data_off > 5
while (
kind := TCPOption.Kind(int.from_bytes(buf.read(1)))
) != TCPOption.Kind.EOL:
logger.debug("read option: %s", kind.name)
data = None
if kind != TCPOption.Kind.NOP:
length = buf.read(1)[0]
assert kind != TCPOption.Kind.TS or (
# Это поле всегда 10 байт
kind == TCPOption.Kind.TS
and length == 10
)
# В длину учитываются: тип (1 байт) и длина (1 байт)
data = buf.read(length - 2)
self.options.append(TCPOption(kind, data))
@classmethod
def unpack(cls: Type[TCPHeader], buf: BinaryIO) -> TCPHeader:
values = cls.struct.unpack(buf.read(cls.struct.size))
hdr = cls(
src_port=values[0],
dst_port=values[1],
seq_num=values[2],
ack_num=values[3],
data_off=(values[4] >> 12) & 0b1111,
reserved=(values[4] >> 8) & 0b1111,
flags=values[4] & 0xFF,
window=values[5],
check=values[6],
urg_ptr=values[7],
)
# Некрасиво выглядит, но это я позже накосытлил
hdr.unpack_options(buf)
return hdr
# https://stackoverflow.com/a/66161909/2240578
# https://gist.github.com/david-hoze/0c7021434796997a4ca42d7731a7073a
def compute_checksum(self, iph: IPHeader) -> None:
s = 0
total_len = socket.ntohs(iph.total_len) - (iph.ihl << 2)
src_ip = int(ipaddress.ip_address(iph.src_ip))
s += (src_ip >> 16) & 0xFFFF
s += src_ip & 0xFFFF
dst_ip = int(ipaddress.ip_address(iph.dst_ip))
s += (dst_ip >> 16) & 0xFFFF
s += dst_ip & 0xFFFF
s += socket.htons(iph.protocol + total_len)
self.check = 0
self.check = checksum(self.pack(), s)
@dataclass
class IPHeader(Component):
class Flags(IntEnum):
R = 0
DF = 1
MF = 2
struct: ClassVar = Struct("!BB3HBBHLL")
src_ip: str
dst_ip: str
_: dataclasses.KW_ONLY
version: int = 4
ihl: int = 5 # ip header length in 32-bits words. always 5
tos: int = 0 # dscp + ecn
# размер пакета без учета Ethernet (14 байт занимает)
total_len: int = struct.size + TCPHeader.struct.size
ident: int = 0
flags: Flags = 0
frag_off: int = 0
ttl: int = 64
protocol: int = socket.IPPROTO_TCP
check: int = 0
def __post_init__(self) -> None:
self.flags = self.Flags(self.flags)
@classmethod
def unpack(cls: type[IPHeader], buf: BinaryIO) -> IPHeader:
values = cls.struct.unpack(buf.read(cls.struct.size))
return cls(
version=(values[0] >> 4) & 0xF,
ihl=values[0] & 0xF,
tos=values[1],
total_len=values[2],
ident=values[3],
flags=(values[4] >> 13) & 0b111,
frag_off=values[4] & 0b11111_11111_111,
ttl=values[5],
protocol=values[6],
check=values[7],
src_ip=str(ipaddress.ip_address(values[8])),
dst_ip=str(ipaddress.ip_address(values[9])),
)
def pack(self) -> bytes:
return self.struct.pack(
(self.version << 4) | self.ihl,
self.tos,
self.total_len,
self.ident,
(self.flags << 13) | self.frag_off,
self.ttl,
self.protocol,
self.check,
int(ipaddress.ip_address(self.src_ip)),
int(ipaddress.ip_address(self.dst_ip)),
)
def compute_checksum(self) -> None:
self.check = 0
self.check = checksum(self.pack())
class EtherType(IntEnum):
IPV4 = 0x800
IPV6 = 0x86DD
def pack_mac_addr(s: str) -> bytes:
return bytes.fromhex(s.replace(":", ""))
def unpack_mac_addr(buf: BinaryIO) -> str:
if rv := buf.read(6):
assert len(rv) == 6
return rv.hex(":")
raise ValueError("nothing to unpack")
@dataclass
class EtherHeader(Component):
dst_mac: str
src_mac: str
type: EtherType
def __post_init__(self) -> None:
self.type = EtherType(self.type)
def pack(self) -> bytes:
return (
pack_mac_addr(self.dst_mac)
+ pack_mac_addr(self.src_mac)
+ int.to_bytes(self.type, 2)
)
@classmethod
def unpack(cls: Type[EtherHeader], buf: BinaryIO) -> EtherHeader:
return cls(
dst_mac=unpack_mac_addr(buf),
src_mac=unpack_mac_addr(buf),
type=int.from_bytes(buf.read(2)),
)
"""
u_short in_cksum(addr, len)
u_short *addr;
int len;
{
int nleft, sum;
u_short *w;
union {
u_short us;
u_char uc[2];
} last;
u_short answer;
nleft = len;
sum = 0;
w = addr;
/*
* Our algorithm is simple, using a 32 bit accumulator (sum), we add
* sequential 16 bit words to it, and at the end, fold back all the
* carry bits from the top 16 bits into the lower 16 bits.
*/
while (nleft > 1) {
sum += *w++;
nleft -= 2;
}
/* mop up an odd byte, if necessary */
if (nleft == 1) {
last.uc[0] = *(u_char *)w;
last.uc[1] = 0;
sum += last.us;
}
/* add back carry outs from top 16 bits to low 16 bits */
sum = (sum >> 16) + (sum & 0xffff); /* add hi 16 to low 16 */
sum += (sum >> 16); /* add carry */
answer = ~sum; /* truncate to 16 bits */
return(answer);
}
TCP/IP ILLUSTRATED:
VOL. 2. THE IMPLEMENTATION
by Gary R. Wright and W. Richard Stevens
"""
# https://ru.stackoverflow.com/a/1024519/239593
def checksum(data: bytes, s: int = 0) -> int:
# добиваем нулл-байтом, чтобы обработать odd byte
s += sum(struct.unpack(f"!{len(data)//2}H", data))
while s > 0xFFFF:
s = (s & 0xFFFF) + (s >> 16)
return 0xFFFF - s
def get_local_ip() -> str:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 53))
return s.getsockname()[0]
def get_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect(("", 0))
except ConnectionRefusedError:
pass
return s.getsockname()[1]
def get_device_mac_address() -> str:
return uuid.getnode().to_bytes(6).hex(":")
# __debug__=False - для игнорирования блоков assert
# Запрос к github.com, снятый WireShark:
SYN_DATA = bytes.fromhex(
"340804c2cf2a74563c2f84ac08004500003c15964000400665aac0a800688c527219c35801bb195cc2ad00000000a0027d78bfaa0000020405b40402080a2ecfe52e0000000001030307"
)
buf = BytesIO(SYN_DATA)
eth = EtherHeader.unpack(buf)
# print(f"{eth=}")
data = SYN_DATA[:14]
p = eth.pack()
# print("original :", data.hex(" "))
# print("packed :", p.hex(" "))
assert data == p
iph = IPHeader.unpack(buf)
# print(f"{iph=}")
assert iph.check == 0x65AA
assert iph.dst_ip == "140.82.114.25"
assert iph.flags == IPHeader.Flags.MF
assert iph.protocol == socket.IPPROTO_TCP
assert iph.src_ip == "192.168.0.104"
assert iph.total_len == 60
assert iph.ttl == 64
assert iph.version == 4
assert iph.ident == 5526
data = SYN_DATA[14:34]
p = iph.pack()
# print("original :", data.hex(" "))
# print("packed :", p.hex(" "))
assert data == p
# при расчете контрольной суммы нужно сбросить поле в 0
# можно и опустить расчеты
# check += changed_value - old_value
old_value = iph.check
iph.compute_checksum()
assert old_value == iph.check
tcph = TCPHeader.unpack(buf)
logger.debug("buffer is empty: %r", buf.tell() == len(SYN_DATA))
assert tcph.ack_num == 0
assert tcph.check == 0xBFAA
assert tcph.data_off == 10
assert tcph.dst_port == 443
assert tcph.seq_num == 425509549
assert tcph.src_port == 50008
assert tcph.window == 32120
data = SYN_DATA[34:]
p = tcph.pack()
# print("original :", data.hex(" "))
# print("packed :", p.hex(" "))
assert data == p
old_value = tcph.check
tcph.compute_checksum(iph)
assert old_value == tcph.check
assert (iph + tcph).pack() == SYN_DATA[14:]
# Ответ на SYN-запрос:
SYN_ACK_DATA = bytes.fromhex(
"74563c2f84ac340804c2cf2a08004500003c000040002a0691408c527219c0a8006801bbc358e9e8ede5195cc2aea012ffff524400000204059c0402080a8fe619732ecfe52e0103030a"
)
buf = BytesIO(SYN_ACK_DATA)
buf.seek(14, os.SEEK_CUR)
iph2 = IPHeader.unpack(buf)
# print(f"{iph2=}")
assert iph.src_ip == iph2.dst_ip
assert iph.dst_ip == iph2.src_ip
tcph2 = TCPHeader.unpack(buf)
# print(f"{tcph2=}")
assert tcph2.flags == TCPHeader.Flags.SYN | TCPHeader.Flags.ACK
data = SYN_ACK_DATA[34:]
packed = tcph2.pack()
# print("origin :", data.hex(" "))
# print("packed :", packed.hex(" "))
assert tcph2.pack() == data
# Все assert'ы выше вынести в тесты
import argparse
import random
import secrets
import sys
parser = argparse.ArgumentParser(description="Send SYN packet")
parser.add_argument("-d", "--debug", action="store_true")
parser.add_argument("address")
parser.add_argument("port", type=int)
args = parser.parse_args()
if args.debug:
logging.basicConfig()
logger.setLevel(logging.DEBUG)
logger.info("start")
src_ip = get_local_ip()
src_port = random.randint(30000, 50000)
logger.debug("local ip address : %s", src_ip)
logger.debug("local port : %d", src_port)
dst_ip = socket.gethostbyname(args.address)
dst_port = args.port
logger.debug("remote ip address : %s", dst_ip)
logger.debug("remote port : %d", dst_port)
ip_header = IPHeader(
src_ip,
dst_ip,
total_len=60,
# ident=secrets.randbits(16),
flags=IPHeader.Flags.MF,
)
# чексумму подставит роутер,если ее не указать
ip_header.compute_checksum()
options = [
(TCPOption.Kind.MSS, bytes.fromhex("05b4")), # 1460
(TCPOption.Kind.SACK_PERMIT,),
# Не знаю что в TSVal поставить
(TCPOption.Kind.TS, secrets.randbits(32).to_bytes(4) + b"\0\0\0\0"),
(TCPOption.Kind.NOP,),
# (TCPOption.Kind.WIN_SCALE, 7),
(TCPOption.Kind.WIN_SCALE, 0),
]
tcp_header = TCPHeader(
src_port,
dst_port,
seq_num=secrets.randbits(32),
flags=TCPHeader.Flags.SYN,
data_off=10, # 40 bytes,
options=options,
)
tcp_header.compute_checksum(ip_header)
syn_packet = ip_header + tcp_header
logger.debug(syn_packet)
data = syn_packet.pack()
logger.debug("packet data : %s", data.hex(" "))
logger.debug("packet size : %d", len(data))
# assert len(data) == ip_header.total_len
try:
# socket.IPPROTO_TCP чтобы сниффить
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
except PermissionError as ex:
raise RuntimeError("root priveleges are required")
# просим не добавлять ip заголовки при отправке (для socket.IPPROTO_RAW не нужно)
# s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
# ip.dst == 178.248.233.6 and tcp.flags.syn == 1 (www.linux.org)
# ip.dst == 85.119.149.3 ?
# ip.dst == 93.184.216.34 (example.com)
if 0 == (n_bytes := s.sendto(data, (ip_header.dst_ip, 0))):
raise RuntimeError("packet was not sent")
logger.debug("bytes sent: %d", n_bytes)
response, addr = s.recvfrom(65535)
logger.debug("response : %s", response.hex(" "))
logger.info("finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment