-
-
Save ScottHelme/87abff3292f3a1ea53ff373a2926e77b to your computer and use it in GitHub Desktop.
Python script to send a partial request to PHP-FPM using FastCGI, taken from https://gist.github.com/poupas/4d2f8f082f7938731495a2732321f377
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import socket | |
import argparse | |
import random | |
import select | |
import struct | |
import errno | |
import time | |
import threading | |
import sys | |
from typing import Tuple | |
FCGI_VERSION = 1 | |
FCGI_HEADER_LEN = 8 | |
# The role the FastCGI process should take. Only FCGI_ROLE_RESPONDER is supported. | |
FCGI_ROLE_RESPONDER = 1 | |
# Wether the connection should be kept alive after the request has been processed. | |
FCGI_KEEP_CONN = 1 | |
# Record types | |
FCGI_BEGIN_RECORD = 1 | |
FCGI_ABORT_RECORD = 2 | |
FCGI_END_RECORD = 3 | |
FCGI_PARAMS_RECORD = 4 | |
FCGI_STDIN_RECORD = 5 | |
FCGI_STDOUT_RECORD = 6 | |
FCGI_STDERR_RECORD = 7 | |
# Creates a FastCGI record. | |
# See: https://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S3.3 | |
def create_fcgi_record(record_type: int, request_id: int, body: bytes) -> bytes: | |
bodylen = len(body) | |
padlen = (8 - bodylen % 8) % 8 | |
header = bytearray(FCGI_HEADER_LEN) | |
header[0] = FCGI_VERSION | |
header[1] = record_type | |
header[2] = (request_id >> 8) & 0xff | |
header[3] = request_id & 0xff | |
header[4] = (bodylen >> 8) & 0xff | |
header[5] = bodylen & 0xff | |
header[6] = padlen | |
# header[7] is reserved and is set to 0 | |
return header + body + (b'\x00' * padlen) | |
# Encodes a length into FastCGI length format. | |
# See: https://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S3.3 | |
def encode_fcgi_length(length: int) -> bytes: | |
# If the length is less than 128, we can encode it in a single byte | |
if length < 128: | |
return bytes([length]) | |
# Otherwise, encode the lenght as a 4 byte big-endian integer and set the | |
# first bit of the first byte to 1 | |
return struct.pack('>I', length | 0x80000000) | |
# Encodes a dictionary of name-value pairs into FastCGI name-value pairs. | |
# See: https://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S3.4 | |
def encode_fcgi_name_value_pairs(pairs: dict[str, str|int]) -> bytes: | |
encoded = bytearray() | |
for name, value in pairs.items(): | |
if isinstance(value, int): | |
value = str(value) | |
name_bytes = name.encode('utf-8') | |
value_bytes = value.encode('utf-8') | |
encoded += encode_fcgi_length(len(name_bytes)) | |
encoded += encode_fcgi_length(len(value_bytes)) | |
encoded += name_bytes | |
encoded += value_bytes | |
return encoded | |
# Create a FastCGI request. We are hardcoding a bunch of stuff, and doing the | |
# bare minimum to get a response from fpm-php :) | |
def create_fcgi_request(params: dict[str, str|int], request_body=None) -> list[bytes]: | |
request = [] | |
request_id = random.randint(1, 65535) | |
# Begin request: https://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S5.1 | |
begin_body = bytearray(8) | |
begin_body[0] = 0 | |
begin_body[1] = FCGI_ROLE_RESPONDER | |
begin_body[2] = 0 # FCGI_KEEP_CONN | |
begin_record = create_fcgi_record(FCGI_BEGIN_RECORD, request_id, begin_body) | |
request.append(begin_record) | |
# Parameters | |
if params: | |
params_body = encode_fcgi_name_value_pairs(params) | |
params_record = create_fcgi_record(FCGI_PARAMS_RECORD, request_id, params_body) | |
request.append(params_record) | |
# An empty parameter record indicates the end of the parameters | |
request.append(create_fcgi_record(FCGI_PARAMS_RECORD, request_id, b'')) | |
# Split the request body into FastCGI STDIN records of 8 bytes (header) + 32 KB (body). | |
# See: https://web.mit.edu/~yandros/doc/specs/fcgi-spec.html#S5.3 | |
if request_body is not None: | |
# Use 32 KB body records to mimic the behavior of the nginx FastCGI module. | |
# Note that we're copying the request body into memory and later sending | |
# the full records with send() (in send_data). | |
# nginx (on Linux) uses writev() to send the record header, and then | |
# sendfile() to send the record body (at least for larger bodies). | |
# We not doing that here to keep things simple. | |
chunk_size = 32 * 1024 | |
num_chunks = len(request_body) // chunk_size + (1 if len(request_body) % chunk_size else 0) | |
for i in range(num_chunks): | |
start = i * chunk_size | |
end = start + chunk_size | |
request.append(create_fcgi_record(FCGI_STDIN_RECORD, request_id, request_body[start:end])) | |
# An empty record indicates the end of the request body | |
request.append(create_fcgi_record(FCGI_STDIN_RECORD, request_id, b'')) | |
return request | |
# Currently not being used, but the idea is to parse the response from the server | |
# and return the response type and body. But since we're not doing anything with | |
# the response, we're just printing it, we don't need this | |
def parse_fcgi_response(response: bytes) -> Tuple[int, bytes]: | |
response_type = response[1] | |
# Skip over the header | |
response = response[8:] | |
# Remove the padding | |
body = response[:-response[-1]] | |
return response_type, body | |
# Same as print but prepend the thread id before the string | |
orig_print = print | |
def print(s: str): | |
orig_print(f"[{threading.get_native_id()}] {s}") | |
# Send data to a socket. Uses epoll to wait for the socket to be ready for writing. | |
# Returns when all data has been sent or an error occurs. | |
# See: https://docs.python.org/3/library/select.html#select.epoll | |
def send_data(s: socket.socket, data: bytes, timeout: int=0): | |
epoll = select.epoll() | |
# Register for EPOLLOUT to wait for the socket to be ready for writing | |
epoll.register(s, select.EPOLLOUT|select.EPOLLERR|select.EPOLLHUP) | |
poll_timeout = 10 | |
if timeout and timeout < poll_timeout: | |
poll_timeout = timeout | |
# Keep sending until we've sent all the data | |
bytes_sent = 0 | |
last_write = time.time() | |
while bytes_sent < len(data): | |
events = epoll.poll(poll_timeout) | |
if not events: | |
now = time.time() | |
print(f"Socket not ready for writing. bytes sent: {bytes_sent}, " | |
f"time since last write: {int(now-last_write)} seconds") | |
if timeout and now - last_write > timeout: | |
raise TimeoutError(f"Timeout of {timeout} seconds reached") | |
continue | |
# epoll says we can write now, write until we've sent everything or until | |
# we get EAGAIN, in which case we need to wait for write readiness. | |
while bytes_sent < len(data): | |
try: | |
send_size = s.send(data[bytes_sent:]) | |
except OSError as err: | |
# If we get EAGAIN, we need to wait for the socket to be ready again. | |
if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK: | |
print(f"Socket is not ready (EAGAIN) {len(data)-bytes_sent} bytes remaining. " | |
"Stopping send until ready...") | |
break | |
# Any other error is unexpected and likely fatal. We must stop sending | |
# and raise an exception | |
raise | |
else: | |
last_write = time.time() | |
bytes_sent += send_size | |
epoll.unregister(s) | |
# Receive data from a socket. Uses epoll to wait for the socket to be ready for reading. | |
# Returns when all data has been read, EOF is reached, or an error occurs | |
# See: https://docs.python.org/3/library/select.html#select.epoll | |
def recv_data(s: socket.socket, timeout: int=0) -> bytes: | |
epoll = select.epoll() | |
# Register for EPOLLIN to wait for the socket to be ready for reading | |
epoll.register(s, select.EPOLLIN|select.EPOLLERR|select.EPOLLHUP) | |
poll_timeout = 10 | |
if timeout and timeout < poll_timeout: | |
poll_timeout = timeout | |
data = bytearray() | |
last_read = time.time() | |
while True: | |
events = epoll.poll(poll_timeout) | |
if not events: | |
now = time.time() | |
print(f"Socket not ready for reading. bytes read: {len(data)}, " | |
f"time since last read: {int(now-last_read)} seconds") | |
if timeout and now - last_read > timeout: | |
raise TimeoutError(f"Timeout of {timeout} seconds reached") | |
continue | |
# epoll says we can read now | |
try: | |
chunk = s.recv(8192) | |
except OSError as err: | |
# If we get EAGAIN, the socket is not closed but no further data is | |
# available (for now). Return what we have. | |
if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK: | |
print("Socket is not ready (EAGAIN). Returning what we've got " | |
f"({len(data)} bytes)") | |
break | |
# Any other error is unexpected and likely fatal. We must stop sending | |
# and raise an exception | |
raise | |
else: | |
# Empty chunk means EOF. We're done. | |
if not chunk: | |
break | |
last_read = time.time() | |
data += chunk | |
epoll.unregister(s) | |
return data | |
# Send a FastCGI request to a server. | |
# If partial is True, the socket will be closed after a random record has been sent. | |
# The list of parameters was taken from a curl request to a nginx server. | |
def send_request(address, request_body: bytes, partial: bool, timeout: int): | |
# Parse the address | |
# Unix socket | |
if address.startswith("unix:"): | |
sockaddr = address[5:] | |
family = socket.AF_UNIX | |
# TCP socket | |
else: | |
if ":" not in address: | |
print("Error: Invalid address format. Expected IP:PORT") | |
return | |
host, portstr = address.split(":") | |
try: | |
addr_info = socket.getaddrinfo(host, portstr, proto=socket.IPPROTO_TCP) | |
family, _, _, _, sockaddr = addr_info[0] | |
except socket.gaierror: | |
print(f"Error: Unable to resolve hostname {host}.") | |
return | |
address = f"{sockaddr[0]}:{portstr}" | |
# Connect to the server | |
s = socket.socket(family, socket.SOCK_STREAM) | |
s.setblocking(False) | |
try: | |
print(f"Connecting to {address}...") | |
s.connect(sockaddr) | |
except socket.error as err: | |
if err.errno != errno.EINPROGRESS: | |
print(f"Error: Unable to connect to {address}: {err}") | |
return | |
# nginx seems to use the following param: | |
params: dict[str, str|int] = { | |
"PATH_INFO": "", | |
"SCRIPT_FILENAME": "/var/www/html/index.php", | |
"QUERY_STRING": "", | |
"REQUEST_METHOD": "POST", | |
"CONTENT_TYPE": "text/plain", | |
"CONTENT_LENGTH": len(request_body), | |
"SCRIPT_NAME": "/index.php", | |
"REQUEST_URI": "/index.php", | |
"DOCUMENT_URI": "/index.php", | |
"DOCUMENT_ROOT": "/var/www/html", | |
"SERVER_PROTOCOL": "HTTP/1.1", | |
"REQUEST_SCHEME": "http", | |
"GATEWAY_INTERFACE": "CGI/1.1", | |
"SERVER_SOFTWARE": "nginx/1.24.0", | |
"REMOTE_ADDR": "", | |
"REMOTE_PORT": "", | |
"REMOTE_USER": "", | |
"SERVER_ADDR": "", | |
"SERVER_PORT": "80", | |
"SERVER_NAME": "_", | |
"REDIRECT_STATUS": "200", | |
"HTTP_HOST": "", | |
"HTTP_USER_AGENT": "curl/7.81.0", | |
"HTTP_ACCEPT": "*/*", | |
"HTTP_EXPECT": "100-continue", | |
} | |
fcgi_request = create_fcgi_request(params, request_body) | |
num_records = len(fcgi_request) | |
print(f"Sending {num_records} FastCGI records...") | |
last_record = random.randint(0, num_records-1) if partial else num_records | |
# Send each record of the request | |
for i, record in enumerate(fcgi_request): | |
try: | |
send_data(s, record, timeout) | |
except OSError as oserr: | |
print(f"Error sending record {i+1}/{num_records}: {oserr}. Closing socket") | |
s.close() | |
return | |
bodylen = len(record) - FCGI_HEADER_LEN | |
print(f" * Sent record {i+1}/{num_records} ({FCGI_HEADER_LEN} + {bodylen} bytes)") | |
if i == last_record: | |
print(f"Closing socket after record {i+1}!") | |
s.close() | |
return | |
print(f"Finished sending {num_records} records. Waiting for response...") | |
response = bytearray() | |
while True: | |
try: | |
data = recv_data(s, timeout) | |
except OSError as err: | |
print(f"Error reading server response: {err}. Closing socket") | |
s.close() | |
return | |
else: | |
if not data: | |
break | |
response += data | |
print(f"Raw server response: {response}") | |
s.close() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Send a request to a FastCGI server") | |
parser.add_argument("address", help="The address of the server in the format IP:PORT or unix:/path/to/socket") | |
parser.add_argument("body_path", help="Path to the file containing the body") | |
parser.add_argument("--partial", default=False, action="store_true", help="Close the socket at a random record if enabled") | |
parser.add_argument("--timeout", default=0, type=int, help="Timeout in seconds for socket read and write operations") | |
parser.add_argument("--parallel", default=1, type=int, help="Number of parallel requests to send") | |
parser.add_argument("--keep-running", default=False, action="store_true", help="Keep running after all requests have been sent") | |
args = parser.parse_args() | |
# Try to read the body file | |
try: | |
with open(args.body_path, 'rb') as f: | |
request_body = f.read() | |
except FileNotFoundError: | |
print(f"Error: File {args.body_path} not found.") | |
sys.exit(-1) | |
# Mimic curl curl by stripping all \r\n instances from the body | |
request_body = request_body.replace(b"\r\n", b"") | |
# Start the workers | |
workers = [] | |
for i in range(args.parallel): | |
print(f"Sending request {i+1}/{args.parallel}") | |
worker = threading.Thread(target=send_request, | |
args=(args.address, request_body, args.partial, args.timeout)) | |
time.sleep(0.02) | |
worker.start() | |
workers.append(worker) | |
if not args.keep_running: | |
for worker in workers: | |
worker.join() | |
sys.exit(0) | |
print("Running. Press Ctrl+C to exit...") | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment