Last active
March 15, 2024 11:27
-
-
Save s3rgeym/89477852a8a63dd789da91b5ec63c83c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# pylint: disable=C,R,W | |
import logging | |
import secrets | |
import socket | |
logger = logging.getLogger(__name__) | |
def get_local_ip() -> str: | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
sock.connect(("8.8.8.8", 53)) | |
return sock.getsockname()[0] | |
# В WireShark нужно поставить галочку Protocol Preferences -> Validate tche TCP checksum if possible | |
def checksum(data: bytes) -> int: | |
s = 0 | |
for i in range(0, len(data), 2): | |
s += (data[i] << 8) + data[i + 1] | |
while s >> 16: | |
s = (s & 0xFFFF) + (s >> 16) | |
return s ^ 0xFFFF | |
def make_ip_header( | |
source_ip: str, dest_ip: str, ident=54321, total_len: int = 40 | |
) -> bytes: | |
""" | |
0 1 2 3 | |
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
|Version| IHL |Type of Service| Total Length | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Identification |Flags| Fragment Offset | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Time to Live | Protocol | Header Checksum | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Source Address | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Destination Address | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Options | Padding | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
https://www.rfc-editor.org/rfc/rfc791#section-3.1 | |
""" | |
logger.debug(f"{source_ip=}") | |
logger.debug(f"{source_port=}") | |
logger.debug(f"{ident=:x}") | |
return ( | |
bytes.fromhex("45 00") | |
+ total_len.to_bytes(2) | |
+ ident.to_bytes(2) | |
+ bytes.fromhex("40 00 40 06") # flags + ttl + protocol | |
+ b"\0\0" # checksum | |
+ socket.inet_aton(source_ip) | |
+ socket.inet_aton(dest_ip) | |
) | |
def make_tcp_packet( | |
source_ip: str, | |
source_port: int, | |
dest_ip: str, | |
dest_port: int, | |
) -> bytes: | |
""" | |
0 1 2 3 | |
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Source Port | Destination Port | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Sequence Number | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Acknowledgment Number | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Data | |U|A|P|R|S|F| | | |
| Offset| Reserved |R|C|S|S|Y|I| Window | | |
| | |G|K|H|T|N|N| | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Checksum | Urgent Pointer | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Options | Padding | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| data | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
https://datatracker.ietf.org/doc/html/rfc793#section-3.1 | |
""" | |
# Для ip header чексуму можно не считать, ее ядро само подставит | |
iph = make_ip_header(source_ip, dest_ip, secrets.randbits(16)) | |
assert len(iph) == 20 | |
tcph = ( | |
source_port.to_bytes(2) | |
+ dest_port.to_bytes(2) | |
+ secrets.randbits(32).to_bytes(4) # seq num | |
+ b"\0\0\0\0" # ack num | |
# 5 - data offset в 4-байтных словах, 2 - SYN флаг | |
+ ((5 << 12) | 2).to_bytes(2) | |
+ (32_120).to_bytes(2) # window size | |
+ b"\0\0\0\0" # checksum + urgent pointer | |
) | |
assert len(tcph) == 20 | |
pseudo_iph = ( | |
socket.inet_aton(source_ip) | |
+ socket.inet_aton(dest_ip) | |
+ socket.IPPROTO_TCP.to_bytes(2) # b'\x06\x00' | |
+ len(tcph).to_bytes(2) | |
) | |
check = checksum(pseudo_iph + tcph) | |
logger.debug("tcp checksum: %x", check) | |
return iph + tcph[:16] + check.to_bytes(2) + tcph[18:] | |
import argparse | |
import logging | |
import random | |
parser = argparse.ArgumentParser() | |
parser.add_argument("address", type=socket.gethostbyname) | |
parser.add_argument("port", type=int) | |
args = parser.parse_args() | |
logging.basicConfig() | |
logger.setLevel(logging.DEBUG) | |
source_ip = get_local_ip() | |
source_port = random.randint(20000, 50000) | |
packet = make_tcp_packet(source_ip, source_port, args.address, args.port) | |
logger.debug("raw packet: %s", packet.hex(" ")) | |
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) | |
n = s.sendto(packet, (args.address, 0)) | |
logger.debug("bytes sent: %d", n) | |
assert n > 0 | |
response, addr = s.recvfrom(1024) | |
logger.debug(addr) | |
logger.debug("raw response: %s", response.hex(" ")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment