Last active
March 3, 2026 19:05
-
-
Save superstes/84e46a1f77a5bae9b5ad4cd1f4900477 to your computer and use it in GitHub Desktop.
Netfilter Rate-Limit Test-Script (NFTables/IPTables)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # the netfilter uses the 'token bucket algorithm' | |
| # it sometimes can be a bit 'unintuitive' how this rate-limit-algorithm works | |
| # the algorithm expects the packets, as defined by the limit, to be somewhat spread over the whole timewindow (second/minute/..) | |
| # token bucket punishes short-term overages beyond the burst capacity | |
| # limit source code: https://github.com/torvalds/linux/blob/master/net/netfilter/nft_limit.c | |
| # this script provides a way to easily test "rate-limit + burst" configurations | |
| # to get practical data you can simply run "tcpdump" on your target system and extract the packet-times from its output | |
| import time | |
| COLOR_RED = '\033[91m' | |
| COLOR_GREEN = '\033[92m' | |
| COLOR_RESET = '\033[0m' | |
| # AI DISCLAIMER: the script-base was generated by AI 'Gemini 2.5 Flash' | |
| class NetfilterTokenBucket: | |
| """ | |
| A Python simulation of Netfilter's nftables token bucket rate limit. | |
| Based on the logic found in net/netfilter/nft_limit.c in the Linux kernel. | |
| """ | |
| def __init__(self, rate: int, burst: int, unit_seconds: float = 1.0, log_packets: bool = True): | |
| """ | |
| Initializes the token bucket. | |
| Args: | |
| rate (int): The maximum number of packets per `unit_seconds`. | |
| burst (int): The maximum number of tokens (packets) allowed in a burst. | |
| This is the bucket's capacity. | |
| unit_seconds (float): The time unit for the rate (e.g., 1.0 for /second, | |
| 60.0 for /minute, 3600.0 for /hour). | |
| log_packets (bool): Disable logging of single packets - useful for high-load tests (p.e. DOS) | |
| """ | |
| if rate <= 0 or burst <= 0: | |
| raise ValueError("Rate and burst must be positive integers.") | |
| if unit_seconds <= 0: | |
| raise ValueError("Unit seconds must be positive.") | |
| self.rate = rate | |
| self.burst = burst | |
| self.unit_seconds = unit_seconds | |
| self.log_packets = log_packets | |
| # State variables, similar to nft_limit_data in the kernel | |
| self.tokens = float(burst) # Initialize with full burst capacity | |
| self.last_update_time = time.monotonic() | |
| # The rate is packets per `unit_seconds`. We need tokens per second. | |
| self._tokens_per_sec = float(rate) / unit_seconds | |
| print(f"Token Bucket Initialized:") | |
| print(f" Rate: {self.rate} packets / {self.unit_seconds} seconds") | |
| print(f" Burst: {self.burst} tokens") | |
| print(f" Tokens per second: {self._tokens_per_sec:.4f}") | |
| print(f" Initial tokens: {self.tokens}") | |
| ts = f"{self.last_update_time:.6f}" | |
| print(f" Last update time: {ts[4:]}") | |
| def allow_packet(self, current_time: float = None) -> bool: | |
| """ | |
| Checks if a packet should be allowed based on the token bucket algorithm. | |
| Args: | |
| current_time (float, optional): The current time. If None, uses time.monotonic(). | |
| Useful for deterministic testing. | |
| Returns: | |
| bool: True if the packet is allowed, False if it's dropped. | |
| """ | |
| if current_time is None: | |
| current_time = time.monotonic() | |
| # Calculate elapsed time since last update | |
| elapsed_time = current_time - self.last_update_time | |
| # Only replenish tokens if time has passed | |
| if elapsed_time > 0: | |
| # Calculate how many tokens should have been generated | |
| generated_tokens = elapsed_time * self._tokens_per_sec | |
| self.tokens = min(self.burst, self.tokens + generated_tokens) | |
| self.last_update_time = current_time | |
| # Check if there's at least one token available | |
| if self.tokens >= 1.0: | |
| self.tokens -= 1.0 | |
| return True | |
| else: | |
| return False | |
| def get_current_tokens(self) -> float: | |
| """Returns the current number of tokens in the bucket.""" | |
| # Force an update to get the latest token count based on elapsed time | |
| self.allow_packet(current_time=time.monotonic()) # Does not consume token | |
| return self.tokens | |
| def get_last_update_time(self) -> float: | |
| """Returns the timestamp of the last token bucket update.""" | |
| return self.last_update_time | |
| def run_scenario(bucket: NetfilterTokenBucket, packet_arrival_times: list): | |
| """ | |
| Runs a simulation scenario with predefined packet arrival times. | |
| Args: | |
| bucket (NetfilterTokenBucket): The initialized token bucket instance. | |
| packet_arrival_times (list): A list of timestamps (relative to start of scenario) | |
| when packets arrive. | |
| """ | |
| if bucket.log_packets: | |
| print("\n--- Running Scenario ---") | |
| ts = f"{time.monotonic():.6f}" | |
| print(f"Scenario Start Time: {ts[4:]}") | |
| passed_packets = 0 | |
| dropped_packets = 0 | |
| for i, arrival_time_relative in enumerate(packet_arrival_times): | |
| # Simulate time passing | |
| current_sim_time = bucket.get_last_update_time() + arrival_time_relative | |
| # Using the absolute time here for consistency in bucket's internal clock | |
| allowed = bucket.allow_packet(current_time=current_sim_time) | |
| if allowed: | |
| passed_packets += 1 | |
| else: | |
| dropped_packets += 1 | |
| if bucket.log_packets: | |
| status = f"{COLOR_GREEN}ALLOWED{COLOR_RESET}" if allowed else f"{COLOR_RED}DROPPED{COLOR_RESET}" | |
| ts = f"{current_sim_time - packet_arrival_times[0]:.4f}" | |
| print(f"[{ts[4:]}] Packet {i+1:_}: {status} - @bucket: {bucket.tokens:.2f}") | |
| # Ensure that the time advances correctly even if packet is dropped | |
| bucket.last_update_time = current_sim_time | |
| print(f"\n--- Scenario Results ---") | |
| total_packets = len(packet_arrival_times) | |
| print(f"Total Packets: {total_packets:_}") | |
| print(f'Time window: {round(sum(packet_arrival_times), 2)}s') | |
| passed_percent = round((passed_packets / total_packets) * 100) | |
| dropped_percent = round((dropped_packets / total_packets) * 100) | |
| print(f"{COLOR_GREEN}Passed: {passed_packets:_} ({passed_percent}%){COLOR_RESET}") | |
| print(f"{COLOR_RED}Dropped: {dropped_packets:_} ({dropped_percent}%){COLOR_RESET}") | |
| def scenario_head(c: str): | |
| c = f"## {c} ##" | |
| s = '#' * len(c) | |
| print(f"\n\n{s}\n{c}\n{s}") | |
| def get_packet_times_gradient(delay_ms: int, count: int) -> list[float]: | |
| # gradient stream of data | |
| return [ | |
| delay_ms / 1000 | |
| for _ in range(count) | |
| ] | |
| def get_packet_times_fast_startup(delay_ms: int, count: int, init_count: int = 150) -> list[float]: | |
| # send first N packets fast and then switch to a gradient stream of data | |
| delay = delay_ms / 1000 | |
| return [ | |
| (i * delay) / count if i < init_count else delay | |
| for i in range(count) | |
| ] | |
| def get_packet_times_dos(pps: int = 30_000, sec: int = 1) -> list[float]: | |
| delay_ms = 1000 / pps | |
| return get_packet_times_gradient(delay_ms, pps * sec) | |
| def main(): | |
| print("### Netfilter Token Bucket Simulation ###") | |
| bucket = NetfilterTokenBucket(rate=500, burst=5, unit_seconds=1.0, log_packets=False) | |
| scenario_head("SCENARIO DUMMY") | |
| packet_delay_ms = 4 | |
| packet_count = 150 | |
| packet_times = get_packet_times_gradient(packet_delay_ms, packet_count) | |
| run_scenario(bucket, packet_times) | |
| scenario_head("SCENARIO CLIENT HTTP3") | |
| packet_delay_ms = 4 | |
| packet_count = 150 | |
| packet_init_count = 50 | |
| packet_times = get_packet_times_fast_startup(packet_delay_ms, packet_count, packet_init_count) | |
| run_scenario(bucket, packet_times) | |
| scenario_head("SCENARIO DOS") | |
| packet_times = get_packet_times_dos(sec=5) | |
| run_scenario(bucket, packet_times) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment