Skip to content

Instantly share code, notes, and snippets.

@s3rgeym
Created March 18, 2024 01:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s3rgeym/705fb1a564ba68465c859f23c5b157e1 to your computer and use it in GitHub Desktop.
Save s3rgeym/705fb1a564ba68465c859f23c5b157e1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# pylint: disable=C,R,W
import argparse
import logging
import random
import secrets
import socket
import sys
import threading
logger = logging.getLogger(__name__)
def get_local_ip() -> str:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.connect(("8.8.8.8", 53))
return sock.getsockname()[0]
# В WireShark нужно поставить галочку Protocol Preferences -> Validate tche TCP checksum if possible
def checksum(data: bytes) -> int:
s = 0
for i in range(0, len(data), 2):
s += (data[i] << 8) + data[i + 1]
while s >> 16:
s = (s & 0xFFFF) + (s >> 16)
return s ^ 0xFFFF
def make_ip_header(
src_ip: str,
dst_ip: str,
ident=0,
tot_len: int = 40,
) -> bytes:
"""
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|Version| IHL |Type of Service| Total Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Identification |Flags| Fragment Offset |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Time to Live | Protocol | Header Checksum |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Source Address |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Destination Address |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Options | Padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
https://www.rfc-editor.org/rfc/rfc791#section-3.1
"""
logger.debug(f"{src_ip=}")
logger.debug(f"{dst_ip=}")
logger.debug(f"{ident=:x}")
return (
bytes.fromhex("45 00")
+ tot_len.to_bytes(2)
+ ident.to_bytes(2)
+ bytes.fromhex("40 00 40 06") # flags + ttl + protocol
+ b"\0\0" # checksum
+ socket.inet_aton(src_ip)
+ socket.inet_aton(dst_ip)
)
def make_syn_packet(
src_ip: str,
src_port: int,
dst_ip: str,
dst_port: int,
) -> bytes:
"""
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Source Port | Destination Port |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Sequence Number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Acknowledgment Number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Data | |U|A|P|R|S|F| |
| Offset| Reserved |R|C|S|S|Y|I| Window |
| | |G|K|H|T|N|N| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Checksum | Urgent Pointer |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Options | Padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| data |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
https://datatracker.ietf.org/doc/html/rfc793#section-3.1
"""
# Для ip header чексуму можно не считать, ее kernel сам подставит
iph = make_ip_header(src_ip, dst_ip)
assert len(iph) == 20
tcph = (
src_port.to_bytes(2)
+ dst_port.to_bytes(2)
+ secrets.randbits(32).to_bytes(4) # seq num
+ b"\0\0\0\0" # ack num
# 5 - data offset в 4-байтных словах, 2 - SYN флаг
+ ((5 << 12) | 2).to_bytes(2)
+ (32_120).to_bytes(2) # window size
+ b"\0\0\0\0" # checksum + urgent pointer
)
assert len(tcph) == 20
pseudo_iph = (
socket.inet_aton(src_ip)
+ socket.inet_aton(dst_ip)
+ socket.IPPROTO_TCP.to_bytes(2) # b'\x06\x00'
+ len(tcph).to_bytes(2)
)
check = checksum(pseudo_iph + tcph)
logger.debug("tcp checksum: %x", check)
return iph + tcph[:16] + check.to_bytes(2) + tcph[18:]
def normalize_ports(ports: list[str]) -> set[int]:
rv = set()
for x in ports:
try:
a, b = map(int, x.split("-", 1))
rv.update(range(a, b + 1))
except ValueError:
rv.add(int(x))
return rv
WELL_KNOWN_PORTS = frozenset(
[
110,
143,
21,
22,
2222,
23,
25,
3306,
443,
465,
5432,
587,
5900,
6379,
80,
8080,
8443,
9000,
993,
995,
]
)
class NameSpace(argparse.Namespace):
address: str
ports: list[str]
debug: bool
timeout: float
def parse_args(
argv: list[str] | None,
) -> tuple[argparse.ArgumentParser, NameSpace]:
parser = argparse.ArgumentParser()
parser.add_argument("address", type=socket.gethostbyname)
parser.add_argument(
"-p",
"--port",
dest="ports",
nargs="+",
help="port or range of ports",
)
parser.add_argument("-t", "--timeout", type=float, default=60.0)
parser.add_argument("-d", "--debug", action="store_true", default=False)
args = parser.parse_args(argv, namespace=NameSpace())
return parser, args
def main(argv: list[str] | None = None) -> None:
_, args = parse_args(argv)
if args.debug:
logging.basicConfig()
logger.setLevel(logging.DEBUG)
ports = normalize_ports(args.ports) if args.ports else WELL_KNOWN_PORTS
local_ip = get_local_ip()
sniff_t = threading.Thread(
target=sniff,
args=(local_ip, args.address, ports),
)
sniff_t.daemon = True
sniff_t.start()
with socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW
) as s:
for port in ports:
src_port = random.randint(30000, 50000)
packet = make_syn_packet(local_ip, src_port, args.address, port)
n = s.sendto(packet, (args.address, 0))
logger.debug("bytes sent: %d", n)
sniff_t.join(args.timeout)
def get_service_names() -> dict[int, str]:
rv = {}
with open("/etc/services") as f:
for line in f:
if not line.strip().endswith("/tcp"):
continue
name, port = line.split()
port, _ = port.split("/")
rv[name] = int(port)
return rv
def invert_dict(d: dict) -> dict:
return dict(map(reversed, d.items()))
def sniff(local_ip: str, target_ip: str, ports: set[int]) -> None:
service_ports = invert_dict(get_service_names())
with socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP
) as s:
while 1:
res, addr = s.recvfrom(65535)
if addr[0] != target_ip:
continue
src_ip = socket.inet_ntoa(res[12:16])
if src_ip != target_ip:
continue
dst_ip = socket.inet_ntoa(res[16:20])
if dst_ip != local_ip:
continue
src_port = int.from_bytes(res[20:22])
# dst_port = int.from_bytes(res[22:24])
# Это самый быстрый способ проверить открыт ли порт
if res[33] == 0x012 and src_port in ports:
print(src_ip, src_port, service_ports.get(src_port), sep="\t")
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment