Skip to content

Instantly share code, notes, and snippets.

@Heaciy
Last active October 20, 2025 09:38
Show Gist options
  • Select an option

  • Save Heaciy/95e7ae3120454efe4af97821da9080f6 to your computer and use it in GitHub Desktop.

Select an option

Save Heaciy/95e7ae3120454efe4af97821da9080f6 to your computer and use it in GitHub Desktop.
Generate pcap file of http1.1 using scapy
# -*- encoding: utf-8 -*-
import json
import random
import hashlib
import ipaddress
from pathlib import Path
from typing import List, Dict
from http import HTTPStatus, HTTPMethod
from scapy.utils import wrpcap
from scapy.packet import Packet
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP as IPv4, TCP # NOQA
from scapy.layers.inet6 import IPv6
from scapy.layers.http import Raw
MTU = 1500
def is_ipv6(ip_str):
"""判断 IP 是否为 IPv6"""
try:
ip = ipaddress.ip_address(ip_str)
return isinstance(ip, ipaddress.IPv6Address)
except ValueError:
return False
def format_host(dip, dport=None, domain=None):
"""格式化 HTTP 请求头中 Host 字段"""
formatted_dip = f"[{dip}]" if is_ipv6(dip) else dip
return f"{domain or formatted_dip}:{dport or 80}"
def gen_mac_from_ip(ip_str):
"""根据 IP 生成固定的 MAC 地址"""
ip_obj = ipaddress.ip_address(ip_str)
sha256 = hashlib.sha256(ip_obj.packed)
mac_bytes = [b for b in sha256.digest()[:6]]
# 将第 1 个字节的最高 2 位设置为 0x02 以确保为私有非广播地址
mac_bytes[0] = (mac_bytes[0] | 0x02) & 0xFE
return ':'.join(f'{b:02x}' for b in mac_bytes)
def gen_http_request(dip, dport, url,
domain: None | str = None,
method: HTTPMethod | str = "GET",
header: None | Dict = None,
body: None | List | Dict | str = None):
"""生成 HTTP 请求报文"""
method = HTTPMethod(method.upper())
header_lines = [f"{method} /{url.lstrip('/')} HTTP/1.1"]
header_ = {
"Content-Type": "application/json",
"Host": format_host(dip, dport, domain),
}
if header:
header_.update(header)
body_content = body if body else None
if isinstance(body, (list, dict)):
body_content = json.dumps(body, ensure_ascii=False)
header_["Content-Length"] = f"{len(body_content.encode(encoding='utf-8')) if body_content else 0}"
header_lines.extend([f"{k}: {v}" for k, v in header_.items()])
header_content = "\r\n".join(header_lines)
response = f"{header_content}\r\n\r\n{body_content}" if body_content else f"{header_content}\r\n\r\n"
return response
def gen_http_response(status_code: HTTPStatus | int = 200,
header: None | Dict = None,
body: None | List | Dict | str = None):
"""生成 HTTP 响应报文"""
status_info = HTTPStatus(status_code).phrase
first_line = f"HTTP/1.1 {status_code} {status_info}"
default_body = {"status": "200", "message": "success"}
body_ = default_body if body is None else body
body_content = json.dumps(body_, ensure_ascii=False) if isinstance(body_, (list, dict)) else body_
# HTTP Header
header_lines = [first_line]
header_ = {
"Content-Type": "application/json",
"Content-Length": f"{len(body_content.encode(encoding='utf-8'))}"
}
if header:
header_.update(header)
for k, v in header_.items():
if k in ("Set-Cookie",) and isinstance(v, list):
header_lines.extend([f"{k}: {v_}" for v_ in v])
else:
header_lines.append(f"{k}: {v}")
header_content = "\r\n".join(header_lines)
return f"{header_content}\r\n\r\n{body_content}"
def gen_http_pcap(
src_ip="1.1.1.1",
src_port=None,
dst_ip="2.2.2.2",
dst_port=80,
domain: None | str = None,
request: None | str | List[str] = None,
response: None | str | List[str] = None,
src_mac: None | str = None,
dst_mac: None | str = None,
pcap_name: None | str = None # 指定了名字才写入 pcap 文件
) -> List[Packet]:
"""生成 HTTP pcap 包"""
assert is_ipv6(src_ip) == is_ipv6(dst_ip)
if is_ipv6(src_ip):
IP = IPv6 # NOQA
MSS = MTU - 40 - 20 # NOQA
else:
IP = IPv4 # NOQA
MSS = MTU - 20 - 20 # NOQA
src_mac = gen_mac_from_ip(src_ip) if not src_mac else src_mac
dst_mac = gen_mac_from_ip(dst_ip) if not dst_mac else dst_mac
src_port = src_port if src_port else random.randint(20000, 50000)
client_seq_start = random.randint(10, 2 ** 30)
server_seq_start = random.randint(10, 2 ** 30)
client_ip_base = Ether(src=src_mac, dst=dst_mac) / IP(src=src_ip, dst=dst_ip)
server_ip_base = Ether(src=dst_mac, dst=src_mac) / IP(src=dst_ip, dst=src_ip)
client_ports = {"sport": src_port, "dport": dst_port}
server_ports = {"sport": dst_port, "dport": src_port}
default_request = f"GET / HTTP/1.1\r\nHost: {format_host(dst_ip, dst_port, domain)}\r\nConnection: close\r\n\r\n"
default_response = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
assert type(request) == type(response)
assert request is None or isinstance(request, str) or len(request) == len(response)
if not request or not response:
request_list, response_list = [default_request], [default_response]
elif isinstance(request, str):
request_list, response_list = [request], [response]
else:
request_list, response_list = request, response
# 构造 SYN 数据包
tcp_syn = client_ip_base / TCP(**client_ports,
flags="S",
seq=client_seq_start, )
# 构造 SYN-ACK 数据包
tcp_syn_ack = server_ip_base / TCP(**server_ports,
flags="SA",
seq=server_seq_start,
ack=tcp_syn[TCP].seq + 1, )
# 构造 ACK 数据包
tcp_ack_handshake = client_ip_base / TCP(**client_ports,
flags="A",
seq=tcp_syn_ack[TCP].ack,
ack=tcp_syn_ack[TCP].seq + 1, )
def _gen_http_data_packets(last_server_packet: Packet, # 客户端上次收到的服务器发送的TCP包
request: str, response: str) -> List[Packet]: # NOQA
"""生成 HTTP 数据交换时的包"""
packet_flags = last_server_packet[TCP].flags
# 三次握手时的 SYN-ACK / 传输数据时的 PUSH-ACK
assert packet_flags in ("SA", "PA")
tcp_segment_len = 1 if packet_flags == "SA" else len(last_server_packet[Raw].load)
request_bytes = request.encode(encoding="utf-8")
response_bytes = response.encode(encoding="utf-8")
def _get_http_chunks(http_bytes: bytes) -> List[bytes]:
"""根据 MSS 拆分 HTTP 包"""
http_length = len(http_bytes)
if http_length <= MSS:
return [http_bytes]
return [http_bytes[i:i + MSS] for i in range(0, http_length, MSS)]
request_chunks = _get_http_chunks(request_bytes)
response_chunks = _get_http_chunks(response_bytes)
http_request_packets = []
http_response_packets = []
request_seq_num = last_server_packet[TCP].ack
request_ack_num = last_server_packet[TCP].seq + tcp_segment_len
for request_chunk in request_chunks:
http_request_packets.append(
client_ip_base
/ TCP(**client_ports, flags="PA", seq=request_seq_num, ack=request_ack_num, )
/ Raw(request_chunk)
)
request_seq_num += len(request_chunk)
http_request_ack = server_ip_base / TCP(**server_ports, flags="A", seq=request_ack_num, ack=request_seq_num, )
response_seq_num = http_request_ack[TCP].seq
response_ack_num = http_request_ack[TCP].ack
for response_chunk in response_chunks:
http_response_packets.append(
server_ip_base
/ TCP(**server_ports, flags="PA", seq=response_seq_num, ack=response_ack_num, )
/ Raw(response_chunk)
)
response_seq_num += len(response_chunk)
http_response_ack = client_ip_base / TCP(**client_ports, flags="A", seq=response_ack_num,
ack=response_seq_num, )
return [*http_request_packets,
http_request_ack,
*http_response_packets,
http_response_ack]
# 构造所有的 HTTP 数据传输的 TCP 包,相当于一次长连接中发送和接受的数据
http_data_packets = _gen_http_data_packets(tcp_syn_ack, request_list[0], response_list[0])
for i, request in enumerate(request_list[1:], start=1):
response = response_list[i]
http_data_packets.extend(_gen_http_data_packets(http_data_packets[-2], request, response))
# 四次挥手 客户端发起
tcp_fin_client = client_ip_base / TCP(**client_ports,
flags="FA",
seq=http_data_packets[-1][TCP].seq,
ack=http_data_packets[-1][TCP].ack, )
tcp_ack_close1 = server_ip_base / TCP(**server_ports,
flags="A",
seq=tcp_fin_client[TCP].ack,
ack=tcp_fin_client[TCP].seq + 1, )
tcp_fin_server = server_ip_base / TCP(**server_ports,
flags="FA",
seq=tcp_ack_close1[TCP].seq,
ack=tcp_fin_client[TCP].seq + 1, )
tcp_ack_close2 = client_ip_base / TCP(**client_ports,
flags="A",
seq=tcp_fin_server[TCP].ack,
ack=tcp_fin_server[TCP].seq + 1, )
http_traffic = [
# 三次握手
tcp_syn,
tcp_syn_ack,
tcp_ack_handshake,
# HTTP 数据交换
*http_data_packets,
# 四次挥手
tcp_fin_client,
tcp_ack_close1,
tcp_fin_server,
tcp_ack_close2,
]
# 将流量报文保存到本地为 .pcap 文件
if pcap_name:
path = Path(pcap_name).with_suffix('.pcap')
wrpcap(str(path), http_traffic)
return http_traffic
def test_user_login():
filename = "test_user_login.pcap"
# 源目 IPv4 信息
# dip, dport = "10.67.2.41", 80
# sip, sport = "10.67.0.63", 10086
# 源目 IPv6 信息
dip, dport = "95b8:f4da:2fbd:40f9:73bd:f615:56ae:90c7", 80
sip, sport = "4f5d:c497:8cb2:2a8e:1644:b63:6582:e861", 10086
# 第一次请求为请求登录接口
url_login = "api/v1/auth/login"
req_body_login = {"username": "admin", "password": "Abc@abc123456"}
res_header_login = {"Set-Cookie": "sessionid=4usn86xe9navazbesstlc6dysunbmxgr"}
request_login = gen_http_request(dip, dport, url_login, method="POST", body=req_body_login)
response_login = gen_http_response(header=res_header_login)
# 第二次请求为请求用户信息接口
url_info = "api/v1/users/admin"
req_header_info = {"Cookie": "sessionid=4usn86xe9navazbesstlc6dysunbmxgr"}
res_body_info = {"username": "admin", "phone": "18056662752", "email": "abc@abc.com", "text": "abc" * 1000}
request_info = gen_http_request(dip, dport, url_info, header=req_header_info)
response_info = gen_http_response(body=res_body_info)
request_list = [request_login, request_info]
response_list = [response_login, response_info]
gen_http_pcap(src_ip=sip,
src_port=sport,
dst_ip=dip,
dst_port=dport,
request=request_list,
response=response_list,
pcap_name=filename)
if __name__ == '__main__':
test_user_login()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment