Skip to content

Instantly share code, notes, and snippets.

@poupas
Last active November 2, 2023 17:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save poupas/4d2f8f082f7938731495a2732321f377 to your computer and use it in GitHub Desktop.
Save poupas/4d2f8f082f7938731495a2732321f377 to your computer and use it in GitHub Desktop.
send-fastcgi-request.py
#!/usr/bin/env python3
import socket
import argparse
import random
import select
import struct
import errno
import time
import threading
import sys
from typing import Tuple
FCGI_VERSION = 1
FCGI_HEADER_LEN = 8
# The role the FastCGI process should take. Only FCGI_ROLE_RESPONDER is supported.
FCGI_ROLE_RESPONDER = 1
# Wether the connection should be kept alive after the request has been processed.
FCGI_KEEP_CONN = 1
# Record types
FCGI_BEGIN_RECORD = 1
FCGI_ABORT_RECORD = 2
FCGI_END_RECORD = 3
FCGI_PARAMS_RECORD = 4
FCGI_STDIN_RECORD = 5
FCGI_STDOUT_RECORD = 6
FCGI_STDERR_RECORD = 7
# Creates a FastCGI record.
# See: https://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S3.3
def create_fcgi_record(record_type: int, request_id: int, body: bytes) -> bytes:
bodylen = len(body)
padlen = (8 - bodylen % 8) % 8
header = bytearray(FCGI_HEADER_LEN)
header[0] = FCGI_VERSION
header[1] = record_type
header[2] = (request_id >> 8) & 0xff
header[3] = request_id & 0xff
header[4] = (bodylen >> 8) & 0xff
header[5] = bodylen & 0xff
header[6] = padlen
# header[7] is reserved and is set to 0
return header + body + (b'\x00' * padlen)
# Encodes a length into FastCGI length format.
# See: https://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S3.3
def encode_fcgi_length(length: int) -> bytes:
# If the length is less than 128, we can encode it in a single byte
if length < 128:
return bytes([length])
# Otherwise, encode the lenght as a 4 byte big-endian integer and set the
# first bit of the first byte to 1
return struct.pack('>I', length | 0x80000000)
# Encodes a dictionary of name-value pairs into FastCGI name-value pairs.
# See: https://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S3.4
def encode_fcgi_name_value_pairs(pairs: dict[str, str|int]) -> bytes:
encoded = bytearray()
for name, value in pairs.items():
if isinstance(value, int):
value = str(value)
name_bytes = name.encode('utf-8')
value_bytes = value.encode('utf-8')
encoded += encode_fcgi_length(len(name_bytes))
encoded += encode_fcgi_length(len(value_bytes))
encoded += name_bytes
encoded += value_bytes
return encoded
# Create a FastCGI request. We are hardcoding a bunch of stuff, and doing the
# bare minimum to get a response from fpm-php :)
def create_fcgi_request(params: dict[str, str|int], request_body=None) -> list[bytes]:
request = []
request_id = random.randint(1, 65535)
# Begin request: https://www.mit.edu/~yandros/doc/specs/fcgi-spec.html#S5.1
begin_body = bytearray(8)
begin_body[0] = 0
begin_body[1] = FCGI_ROLE_RESPONDER
begin_body[2] = 0 # FCGI_KEEP_CONN
begin_record = create_fcgi_record(FCGI_BEGIN_RECORD, request_id, begin_body)
request.append(begin_record)
# Parameters
if params:
params_body = encode_fcgi_name_value_pairs(params)
params_record = create_fcgi_record(FCGI_PARAMS_RECORD, request_id, params_body)
request.append(params_record)
# An empty parameter record indicates the end of the parameters
request.append(create_fcgi_record(FCGI_PARAMS_RECORD, request_id, b''))
# Split the request body into FastCGI STDIN records of 8 bytes (header) + 32 KB (body).
# See: https://web.mit.edu/~yandros/doc/specs/fcgi-spec.html#S5.3
if request_body is not None:
# Use 32 KB body records to mimic the behavior of the nginx FastCGI module.
# Note that we're copying the request body into memory and later sending
# the full records with send() (in send_data).
# nginx (on Linux) uses writev() to send the record header, and then
# sendfile() to send the record body (at least for larger bodies).
# We not doing that here to keep things simple.
chunk_size = 32 * 1024
num_chunks = len(request_body) // chunk_size + (1 if len(request_body) % chunk_size else 0)
for i in range(num_chunks):
start = i * chunk_size
end = start + chunk_size
request.append(create_fcgi_record(FCGI_STDIN_RECORD, request_id, request_body[start:end]))
# An empty record indicates the end of the request body
request.append(create_fcgi_record(FCGI_STDIN_RECORD, request_id, b''))
return request
# Currently not being used, but the idea is to parse the response from the server
# and return the response type and body. But since we're not doing anything with
# the response, we're just printing it, we don't need this
def parse_fcgi_response(response: bytes) -> Tuple[int, bytes]:
response_type = response[1]
# Skip over the header
response = response[8:]
# Remove the padding
body = response[:-response[-1]]
return response_type, body
# Same as print but prepend the thread id before the string
orig_print = print
def print(s: str):
orig_print(f"[{threading.get_native_id()}] {s}")
# Send data to a socket. Uses epoll to wait for the socket to be ready for writing.
# Returns when all data has been sent or an error occurs.
# See: https://docs.python.org/3/library/select.html#select.epoll
def send_data(s: socket.socket, data: bytes, timeout: int=0):
epoll = select.epoll()
# Register for EPOLLOUT to wait for the socket to be ready for writing
epoll.register(s, select.EPOLLOUT|select.EPOLLERR|select.EPOLLHUP)
poll_timeout = 10
if timeout and timeout < poll_timeout:
poll_timeout = timeout
# Keep sending until we've sent all the data
bytes_sent = 0
last_write = time.time()
while bytes_sent < len(data):
events = epoll.poll(poll_timeout)
if not events:
now = time.time()
print(f"Socket not ready for writing. bytes sent: {bytes_sent}, "
f"time since last write: {int(now-last_write)} seconds")
if timeout and now - last_write > timeout:
raise TimeoutError(f"Timeout of {timeout} seconds reached")
continue
# epoll says we can write now, write until we've sent everything or until
# we get EAGAIN, in which case we need to wait for write readiness.
while bytes_sent < len(data):
try:
send_size = s.send(data[bytes_sent:])
except OSError as err:
# If we get EAGAIN, we need to wait for the socket to be ready again.
if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK:
print(f"Socket is not ready (EAGAIN) {len(data)-bytes_sent} bytes remaining. "
"Stopping send until ready...")
break
# Any other error is unexpected and likely fatal. We must stop sending
# and raise an exception
raise
else:
last_write = time.time()
bytes_sent += send_size
epoll.unregister(s)
# Receive data from a socket. Uses epoll to wait for the socket to be ready for reading.
# Returns when all data has been read, EOF is reached, or an error occurs
# See: https://docs.python.org/3/library/select.html#select.epoll
def recv_data(s: socket.socket, timeout: int=0) -> bytes:
epoll = select.epoll()
# Register for EPOLLIN to wait for the socket to be ready for reading
epoll.register(s, select.EPOLLIN|select.EPOLLERR|select.EPOLLHUP)
poll_timeout = 10
if timeout and timeout < poll_timeout:
poll_timeout = timeout
data = bytearray()
last_read = time.time()
while True:
events = epoll.poll(poll_timeout)
if not events:
now = time.time()
print(f"Socket not ready for reading. bytes read: {len(data)}, "
f"time since last read: {int(now-last_read)} seconds")
if timeout and now - last_read > timeout:
raise TimeoutError(f"Timeout of {timeout} seconds reached")
continue
# epoll says we can read now
try:
chunk = s.recv(8192)
except OSError as err:
# If we get EAGAIN, the socket is not closed but no further data is
# available (for now). Return what we have.
if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK:
print("Socket is not ready (EAGAIN). Returning what we've got "
f"({len(data)} bytes)")
break
# Any other error is unexpected and likely fatal. We must stop sending
# and raise an exception
raise
else:
# Empty chunk means EOF. We're done.
if not chunk:
break
last_read = time.time()
data += chunk
epoll.unregister(s)
return data
# Send a FastCGI request to a server.
# If partial is True, the socket will be closed after a random record has been sent.
# The list of parameters was taken from a curl request to a nginx server.
def send_request(address, request_body: bytes, partial: bool, timeout: int):
# Parse the address
# Unix socket
if address.startswith("unix:"):
sockaddr = address[5:]
family = socket.AF_UNIX
# TCP socket
else:
if ":" not in address:
print("Error: Invalid address format. Expected IP:PORT")
return
host, portstr = address.split(":")
try:
addr_info = socket.getaddrinfo(host, portstr, proto=socket.IPPROTO_TCP)
family, _, _, _, sockaddr = addr_info[0]
except socket.gaierror:
print(f"Error: Unable to resolve hostname {host}.")
return
address = f"{sockaddr[0]}:{portstr}"
# Connect to the server
s = socket.socket(family, socket.SOCK_STREAM)
s.setblocking(False)
try:
print(f"Connecting to {address}...")
s.connect(sockaddr)
except socket.error as err:
if err.errno != errno.EINPROGRESS:
print(f"Error: Unable to connect to {address}: {err}")
return
# nginx seems to use the following param:
params: dict[str, str|int] = {
"PATH_INFO": "",
"SCRIPT_FILENAME": "/var/www/html/index.php",
"QUERY_STRING": "",
"REQUEST_METHOD": "POST",
"CONTENT_TYPE": "text/plain",
"CONTENT_LENGTH": len(request_body),
"SCRIPT_NAME": "/index.php",
"REQUEST_URI": "/index.php",
"DOCUMENT_URI": "/index.php",
"DOCUMENT_ROOT": "/var/www/html",
"SERVER_PROTOCOL": "HTTP/1.1",
"REQUEST_SCHEME": "http",
"GATEWAY_INTERFACE": "CGI/1.1",
"SERVER_SOFTWARE": "nginx/1.24.0",
"REMOTE_ADDR": "",
"REMOTE_PORT": "",
"REMOTE_USER": "",
"SERVER_ADDR": "",
"SERVER_PORT": "80",
"SERVER_NAME": "_",
"REDIRECT_STATUS": "200",
"HTTP_HOST": "",
"HTTP_USER_AGENT": "curl/7.81.0",
"HTTP_ACCEPT": "*/*",
"HTTP_EXPECT": "100-continue",
}
fcgi_request = create_fcgi_request(params, request_body)
num_records = len(fcgi_request)
print(f"Sending {num_records} FastCGI records...")
last_record = random.randint(0, num_records-1) if partial else num_records
# Send each record of the request
for i, record in enumerate(fcgi_request):
try:
send_data(s, record, timeout)
except OSError as oserr:
print(f"Error sending record {i+1}/{num_records}: {oserr}. Closing socket")
s.close()
return
bodylen = len(record) - FCGI_HEADER_LEN
print(f" * Sent record {i+1}/{num_records} ({FCGI_HEADER_LEN} + {bodylen} bytes)")
if i == last_record:
print(f"Closing socket after record {i+1}!")
s.close()
return
print(f"Finished sending {num_records} records. Waiting for response...")
response = bytearray()
while True:
try:
data = recv_data(s, timeout)
except OSError as err:
print(f"Error reading server response: {err}. Closing socket")
s.close()
return
else:
if not data:
break
response += data
print(f"Raw server response: {response}")
s.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Send a request to a FastCGI server")
parser.add_argument("address", help="The address of the server in the format IP:PORT or unix:/path/to/socket")
parser.add_argument("body_path", help="Path to the file containing the body")
parser.add_argument("--partial", default=False, action="store_true", help="Close the socket at a random record if enabled")
parser.add_argument("--timeout", default=0, type=int, help="Timeout in seconds for socket read and write operations")
parser.add_argument("--parallel", default=1, type=int, help="Number of parallel requests to send")
parser.add_argument("--keep-running", default=False, action="store_true", help="Keep running after all requests have been sent")
args = parser.parse_args()
# Try to read the body file
try:
with open(args.body_path, 'rb') as f:
request_body = f.read()
except FileNotFoundError:
print(f"Error: File {args.body_path} not found.")
sys.exit(-1)
# Mimic curl curl by stripping all \r\n instances from the body
request_body = request_body.replace(b"\r\n", b"")
# Start the workers
workers = []
for i in range(args.parallel):
print(f"Sending request {i+1}/{args.parallel}")
worker = threading.Thread(target=send_request,
args=(args.address, request_body, args.partial, args.timeout))
time.sleep(0.02)
worker.start()
workers.append(worker)
if not args.keep_running:
for worker in workers:
worker.join()
sys.exit(0)
print("Running. Press Ctrl+C to exit...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment