Last active
October 20, 2025 09:38
-
-
Save Heaciy/95e7ae3120454efe4af97821da9080f6 to your computer and use it in GitHub Desktop.
Generate pcap file of http1.1 using scapy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- encoding: utf-8 -*- | |
| import json | |
| import random | |
| import hashlib | |
| import ipaddress | |
| from pathlib import Path | |
| from typing import List, Dict | |
| from http import HTTPStatus, HTTPMethod | |
| from scapy.utils import wrpcap | |
| from scapy.packet import Packet | |
| from scapy.layers.l2 import Ether | |
| from scapy.layers.inet import IP as IPv4, TCP # NOQA | |
| from scapy.layers.inet6 import IPv6 | |
| from scapy.layers.http import Raw | |
| MTU = 1500 | |
| def is_ipv6(ip_str): | |
| """判断 IP 是否为 IPv6""" | |
| try: | |
| ip = ipaddress.ip_address(ip_str) | |
| return isinstance(ip, ipaddress.IPv6Address) | |
| except ValueError: | |
| return False | |
| def format_host(dip, dport=None, domain=None): | |
| """格式化 HTTP 请求头中 Host 字段""" | |
| formatted_dip = f"[{dip}]" if is_ipv6(dip) else dip | |
| return f"{domain or formatted_dip}:{dport or 80}" | |
| def gen_mac_from_ip(ip_str): | |
| """根据 IP 生成固定的 MAC 地址""" | |
| ip_obj = ipaddress.ip_address(ip_str) | |
| sha256 = hashlib.sha256(ip_obj.packed) | |
| mac_bytes = [b for b in sha256.digest()[:6]] | |
| # 将第 1 个字节的最高 2 位设置为 0x02 以确保为私有非广播地址 | |
| mac_bytes[0] = (mac_bytes[0] | 0x02) & 0xFE | |
| return ':'.join(f'{b:02x}' for b in mac_bytes) | |
| def gen_http_request(dip, dport, url, | |
| domain: None | str = None, | |
| method: HTTPMethod | str = "GET", | |
| header: None | Dict = None, | |
| body: None | List | Dict | str = None): | |
| """生成 HTTP 请求报文""" | |
| method = HTTPMethod(method.upper()) | |
| header_lines = [f"{method} /{url.lstrip('/')} HTTP/1.1"] | |
| header_ = { | |
| "Content-Type": "application/json", | |
| "Host": format_host(dip, dport, domain), | |
| } | |
| if header: | |
| header_.update(header) | |
| body_content = body if body else None | |
| if isinstance(body, (list, dict)): | |
| body_content = json.dumps(body, ensure_ascii=False) | |
| header_["Content-Length"] = f"{len(body_content.encode(encoding='utf-8')) if body_content else 0}" | |
| header_lines.extend([f"{k}: {v}" for k, v in header_.items()]) | |
| header_content = "\r\n".join(header_lines) | |
| response = f"{header_content}\r\n\r\n{body_content}" if body_content else f"{header_content}\r\n\r\n" | |
| return response | |
| def gen_http_response(status_code: HTTPStatus | int = 200, | |
| header: None | Dict = None, | |
| body: None | List | Dict | str = None): | |
| """生成 HTTP 响应报文""" | |
| status_info = HTTPStatus(status_code).phrase | |
| first_line = f"HTTP/1.1 {status_code} {status_info}" | |
| default_body = {"status": "200", "message": "success"} | |
| body_ = default_body if body is None else body | |
| body_content = json.dumps(body_, ensure_ascii=False) if isinstance(body_, (list, dict)) else body_ | |
| # HTTP Header | |
| header_lines = [first_line] | |
| header_ = { | |
| "Content-Type": "application/json", | |
| "Content-Length": f"{len(body_content.encode(encoding='utf-8'))}" | |
| } | |
| if header: | |
| header_.update(header) | |
| for k, v in header_.items(): | |
| if k in ("Set-Cookie",) and isinstance(v, list): | |
| header_lines.extend([f"{k}: {v_}" for v_ in v]) | |
| else: | |
| header_lines.append(f"{k}: {v}") | |
| header_content = "\r\n".join(header_lines) | |
| return f"{header_content}\r\n\r\n{body_content}" | |
| def gen_http_pcap( | |
| src_ip="1.1.1.1", | |
| src_port=None, | |
| dst_ip="2.2.2.2", | |
| dst_port=80, | |
| domain: None | str = None, | |
| request: None | str | List[str] = None, | |
| response: None | str | List[str] = None, | |
| src_mac: None | str = None, | |
| dst_mac: None | str = None, | |
| pcap_name: None | str = None # 指定了名字才写入 pcap 文件 | |
| ) -> List[Packet]: | |
| """生成 HTTP pcap 包""" | |
| assert is_ipv6(src_ip) == is_ipv6(dst_ip) | |
| if is_ipv6(src_ip): | |
| IP = IPv6 # NOQA | |
| MSS = MTU - 40 - 20 # NOQA | |
| else: | |
| IP = IPv4 # NOQA | |
| MSS = MTU - 20 - 20 # NOQA | |
| src_mac = gen_mac_from_ip(src_ip) if not src_mac else src_mac | |
| dst_mac = gen_mac_from_ip(dst_ip) if not dst_mac else dst_mac | |
| src_port = src_port if src_port else random.randint(20000, 50000) | |
| client_seq_start = random.randint(10, 2 ** 30) | |
| server_seq_start = random.randint(10, 2 ** 30) | |
| client_ip_base = Ether(src=src_mac, dst=dst_mac) / IP(src=src_ip, dst=dst_ip) | |
| server_ip_base = Ether(src=dst_mac, dst=src_mac) / IP(src=dst_ip, dst=src_ip) | |
| client_ports = {"sport": src_port, "dport": dst_port} | |
| server_ports = {"sport": dst_port, "dport": src_port} | |
| default_request = f"GET / HTTP/1.1\r\nHost: {format_host(dst_ip, dst_port, domain)}\r\nConnection: close\r\n\r\n" | |
| default_response = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: 0\r\nConnection: close\r\n\r\n" | |
| assert type(request) == type(response) | |
| assert request is None or isinstance(request, str) or len(request) == len(response) | |
| if not request or not response: | |
| request_list, response_list = [default_request], [default_response] | |
| elif isinstance(request, str): | |
| request_list, response_list = [request], [response] | |
| else: | |
| request_list, response_list = request, response | |
| # 构造 SYN 数据包 | |
| tcp_syn = client_ip_base / TCP(**client_ports, | |
| flags="S", | |
| seq=client_seq_start, ) | |
| # 构造 SYN-ACK 数据包 | |
| tcp_syn_ack = server_ip_base / TCP(**server_ports, | |
| flags="SA", | |
| seq=server_seq_start, | |
| ack=tcp_syn[TCP].seq + 1, ) | |
| # 构造 ACK 数据包 | |
| tcp_ack_handshake = client_ip_base / TCP(**client_ports, | |
| flags="A", | |
| seq=tcp_syn_ack[TCP].ack, | |
| ack=tcp_syn_ack[TCP].seq + 1, ) | |
| def _gen_http_data_packets(last_server_packet: Packet, # 客户端上次收到的服务器发送的TCP包 | |
| request: str, response: str) -> List[Packet]: # NOQA | |
| """生成 HTTP 数据交换时的包""" | |
| packet_flags = last_server_packet[TCP].flags | |
| # 三次握手时的 SYN-ACK / 传输数据时的 PUSH-ACK | |
| assert packet_flags in ("SA", "PA") | |
| tcp_segment_len = 1 if packet_flags == "SA" else len(last_server_packet[Raw].load) | |
| request_bytes = request.encode(encoding="utf-8") | |
| response_bytes = response.encode(encoding="utf-8") | |
| def _get_http_chunks(http_bytes: bytes) -> List[bytes]: | |
| """根据 MSS 拆分 HTTP 包""" | |
| http_length = len(http_bytes) | |
| if http_length <= MSS: | |
| return [http_bytes] | |
| return [http_bytes[i:i + MSS] for i in range(0, http_length, MSS)] | |
| request_chunks = _get_http_chunks(request_bytes) | |
| response_chunks = _get_http_chunks(response_bytes) | |
| http_request_packets = [] | |
| http_response_packets = [] | |
| request_seq_num = last_server_packet[TCP].ack | |
| request_ack_num = last_server_packet[TCP].seq + tcp_segment_len | |
| for request_chunk in request_chunks: | |
| http_request_packets.append( | |
| client_ip_base | |
| / TCP(**client_ports, flags="PA", seq=request_seq_num, ack=request_ack_num, ) | |
| / Raw(request_chunk) | |
| ) | |
| request_seq_num += len(request_chunk) | |
| http_request_ack = server_ip_base / TCP(**server_ports, flags="A", seq=request_ack_num, ack=request_seq_num, ) | |
| response_seq_num = http_request_ack[TCP].seq | |
| response_ack_num = http_request_ack[TCP].ack | |
| for response_chunk in response_chunks: | |
| http_response_packets.append( | |
| server_ip_base | |
| / TCP(**server_ports, flags="PA", seq=response_seq_num, ack=response_ack_num, ) | |
| / Raw(response_chunk) | |
| ) | |
| response_seq_num += len(response_chunk) | |
| http_response_ack = client_ip_base / TCP(**client_ports, flags="A", seq=response_ack_num, | |
| ack=response_seq_num, ) | |
| return [*http_request_packets, | |
| http_request_ack, | |
| *http_response_packets, | |
| http_response_ack] | |
| # 构造所有的 HTTP 数据传输的 TCP 包,相当于一次长连接中发送和接受的数据 | |
| http_data_packets = _gen_http_data_packets(tcp_syn_ack, request_list[0], response_list[0]) | |
| for i, request in enumerate(request_list[1:], start=1): | |
| response = response_list[i] | |
| http_data_packets.extend(_gen_http_data_packets(http_data_packets[-2], request, response)) | |
| # 四次挥手 客户端发起 | |
| tcp_fin_client = client_ip_base / TCP(**client_ports, | |
| flags="FA", | |
| seq=http_data_packets[-1][TCP].seq, | |
| ack=http_data_packets[-1][TCP].ack, ) | |
| tcp_ack_close1 = server_ip_base / TCP(**server_ports, | |
| flags="A", | |
| seq=tcp_fin_client[TCP].ack, | |
| ack=tcp_fin_client[TCP].seq + 1, ) | |
| tcp_fin_server = server_ip_base / TCP(**server_ports, | |
| flags="FA", | |
| seq=tcp_ack_close1[TCP].seq, | |
| ack=tcp_fin_client[TCP].seq + 1, ) | |
| tcp_ack_close2 = client_ip_base / TCP(**client_ports, | |
| flags="A", | |
| seq=tcp_fin_server[TCP].ack, | |
| ack=tcp_fin_server[TCP].seq + 1, ) | |
| http_traffic = [ | |
| # 三次握手 | |
| tcp_syn, | |
| tcp_syn_ack, | |
| tcp_ack_handshake, | |
| # HTTP 数据交换 | |
| *http_data_packets, | |
| # 四次挥手 | |
| tcp_fin_client, | |
| tcp_ack_close1, | |
| tcp_fin_server, | |
| tcp_ack_close2, | |
| ] | |
| # 将流量报文保存到本地为 .pcap 文件 | |
| if pcap_name: | |
| path = Path(pcap_name).with_suffix('.pcap') | |
| wrpcap(str(path), http_traffic) | |
| return http_traffic | |
| def test_user_login(): | |
| filename = "test_user_login.pcap" | |
| # 源目 IPv4 信息 | |
| # dip, dport = "10.67.2.41", 80 | |
| # sip, sport = "10.67.0.63", 10086 | |
| # 源目 IPv6 信息 | |
| dip, dport = "95b8:f4da:2fbd:40f9:73bd:f615:56ae:90c7", 80 | |
| sip, sport = "4f5d:c497:8cb2:2a8e:1644:b63:6582:e861", 10086 | |
| # 第一次请求为请求登录接口 | |
| url_login = "api/v1/auth/login" | |
| req_body_login = {"username": "admin", "password": "Abc@abc123456"} | |
| res_header_login = {"Set-Cookie": "sessionid=4usn86xe9navazbesstlc6dysunbmxgr"} | |
| request_login = gen_http_request(dip, dport, url_login, method="POST", body=req_body_login) | |
| response_login = gen_http_response(header=res_header_login) | |
| # 第二次请求为请求用户信息接口 | |
| url_info = "api/v1/users/admin" | |
| req_header_info = {"Cookie": "sessionid=4usn86xe9navazbesstlc6dysunbmxgr"} | |
| res_body_info = {"username": "admin", "phone": "18056662752", "email": "abc@abc.com", "text": "abc" * 1000} | |
| request_info = gen_http_request(dip, dport, url_info, header=req_header_info) | |
| response_info = gen_http_response(body=res_body_info) | |
| request_list = [request_login, request_info] | |
| response_list = [response_login, response_info] | |
| gen_http_pcap(src_ip=sip, | |
| src_port=sport, | |
| dst_ip=dip, | |
| dst_port=dport, | |
| request=request_list, | |
| response=response_list, | |
| pcap_name=filename) | |
| if __name__ == '__main__': | |
| test_user_login() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment