Skip to content

Instantly share code, notes, and snippets.

@superstes
Last active March 3, 2026 19:05
Show Gist options
  • Select an option

  • Save superstes/84e46a1f77a5bae9b5ad4cd1f4900477 to your computer and use it in GitHub Desktop.

Select an option

Save superstes/84e46a1f77a5bae9b5ad4cd1f4900477 to your computer and use it in GitHub Desktop.
Netfilter Rate-Limit Test-Script (NFTables/IPTables)
#!/usr/bin/env python3
# the netfilter uses the 'token bucket algorithm'
# it sometimes can be a bit 'unintuitive' how this rate-limit-algorithm works
# the algorithm expects the packets, as defined by the limit, to be somewhat spread over the whole timewindow (second/minute/..)
# token bucket punishes short-term overages beyond the burst capacity
# limit source code: https://github.com/torvalds/linux/blob/master/net/netfilter/nft_limit.c
# this script provides a way to easily test "rate-limit + burst" configurations
# to get practical data you can simply run "tcpdump" on your target system and extract the packet-times from its output
import time
COLOR_RED = '\033[91m'
COLOR_GREEN = '\033[92m'
COLOR_RESET = '\033[0m'
# AI DISCLAIMER: the script-base was generated by AI 'Gemini 2.5 Flash'
class NetfilterTokenBucket:
"""
A Python simulation of Netfilter's nftables token bucket rate limit.
Based on the logic found in net/netfilter/nft_limit.c in the Linux kernel.
"""
def __init__(self, rate: int, burst: int, unit_seconds: float = 1.0, log_packets: bool = True):
"""
Initializes the token bucket.
Args:
rate (int): The maximum number of packets per `unit_seconds`.
burst (int): The maximum number of tokens (packets) allowed in a burst.
This is the bucket's capacity.
unit_seconds (float): The time unit for the rate (e.g., 1.0 for /second,
60.0 for /minute, 3600.0 for /hour).
log_packets (bool): Disable logging of single packets - useful for high-load tests (p.e. DOS)
"""
if rate <= 0 or burst <= 0:
raise ValueError("Rate and burst must be positive integers.")
if unit_seconds <= 0:
raise ValueError("Unit seconds must be positive.")
self.rate = rate
self.burst = burst
self.unit_seconds = unit_seconds
self.log_packets = log_packets
# State variables, similar to nft_limit_data in the kernel
self.tokens = float(burst) # Initialize with full burst capacity
self.last_update_time = time.monotonic()
# The rate is packets per `unit_seconds`. We need tokens per second.
self._tokens_per_sec = float(rate) / unit_seconds
print(f"Token Bucket Initialized:")
print(f" Rate: {self.rate} packets / {self.unit_seconds} seconds")
print(f" Burst: {self.burst} tokens")
print(f" Tokens per second: {self._tokens_per_sec:.4f}")
print(f" Initial tokens: {self.tokens}")
ts = f"{self.last_update_time:.6f}"
print(f" Last update time: {ts[4:]}")
def allow_packet(self, current_time: float = None) -> bool:
"""
Checks if a packet should be allowed based on the token bucket algorithm.
Args:
current_time (float, optional): The current time. If None, uses time.monotonic().
Useful for deterministic testing.
Returns:
bool: True if the packet is allowed, False if it's dropped.
"""
if current_time is None:
current_time = time.monotonic()
# Calculate elapsed time since last update
elapsed_time = current_time - self.last_update_time
# Only replenish tokens if time has passed
if elapsed_time > 0:
# Calculate how many tokens should have been generated
generated_tokens = elapsed_time * self._tokens_per_sec
self.tokens = min(self.burst, self.tokens + generated_tokens)
self.last_update_time = current_time
# Check if there's at least one token available
if self.tokens >= 1.0:
self.tokens -= 1.0
return True
else:
return False
def get_current_tokens(self) -> float:
"""Returns the current number of tokens in the bucket."""
# Force an update to get the latest token count based on elapsed time
self.allow_packet(current_time=time.monotonic()) # Does not consume token
return self.tokens
def get_last_update_time(self) -> float:
"""Returns the timestamp of the last token bucket update."""
return self.last_update_time
def run_scenario(bucket: NetfilterTokenBucket, packet_arrival_times: list):
"""
Runs a simulation scenario with predefined packet arrival times.
Args:
bucket (NetfilterTokenBucket): The initialized token bucket instance.
packet_arrival_times (list): A list of timestamps (relative to start of scenario)
when packets arrive.
"""
if bucket.log_packets:
print("\n--- Running Scenario ---")
ts = f"{time.monotonic():.6f}"
print(f"Scenario Start Time: {ts[4:]}")
passed_packets = 0
dropped_packets = 0
for i, arrival_time_relative in enumerate(packet_arrival_times):
# Simulate time passing
current_sim_time = bucket.get_last_update_time() + arrival_time_relative
# Using the absolute time here for consistency in bucket's internal clock
allowed = bucket.allow_packet(current_time=current_sim_time)
if allowed:
passed_packets += 1
else:
dropped_packets += 1
if bucket.log_packets:
status = f"{COLOR_GREEN}ALLOWED{COLOR_RESET}" if allowed else f"{COLOR_RED}DROPPED{COLOR_RESET}"
ts = f"{current_sim_time - packet_arrival_times[0]:.4f}"
print(f"[{ts[4:]}] Packet {i+1:_}: {status} - @bucket: {bucket.tokens:.2f}")
# Ensure that the time advances correctly even if packet is dropped
bucket.last_update_time = current_sim_time
print(f"\n--- Scenario Results ---")
total_packets = len(packet_arrival_times)
print(f"Total Packets: {total_packets:_}")
print(f'Time window: {round(sum(packet_arrival_times), 2)}s')
passed_percent = round((passed_packets / total_packets) * 100)
dropped_percent = round((dropped_packets / total_packets) * 100)
print(f"{COLOR_GREEN}Passed: {passed_packets:_} ({passed_percent}%){COLOR_RESET}")
print(f"{COLOR_RED}Dropped: {dropped_packets:_} ({dropped_percent}%){COLOR_RESET}")
def scenario_head(c: str):
c = f"## {c} ##"
s = '#' * len(c)
print(f"\n\n{s}\n{c}\n{s}")
def get_packet_times_gradient(delay_ms: int, count: int) -> list[float]:
# gradient stream of data
return [
delay_ms / 1000
for _ in range(count)
]
def get_packet_times_fast_startup(delay_ms: int, count: int, init_count: int = 150) -> list[float]:
# send first N packets fast and then switch to a gradient stream of data
delay = delay_ms / 1000
return [
(i * delay) / count if i < init_count else delay
for i in range(count)
]
def get_packet_times_dos(pps: int = 30_000, sec: int = 1) -> list[float]:
delay_ms = 1000 / pps
return get_packet_times_gradient(delay_ms, pps * sec)
def main():
print("### Netfilter Token Bucket Simulation ###")
bucket = NetfilterTokenBucket(rate=500, burst=5, unit_seconds=1.0, log_packets=False)
scenario_head("SCENARIO DUMMY")
packet_delay_ms = 4
packet_count = 150
packet_times = get_packet_times_gradient(packet_delay_ms, packet_count)
run_scenario(bucket, packet_times)
scenario_head("SCENARIO CLIENT HTTP3")
packet_delay_ms = 4
packet_count = 150
packet_init_count = 50
packet_times = get_packet_times_fast_startup(packet_delay_ms, packet_count, packet_init_count)
run_scenario(bucket, packet_times)
scenario_head("SCENARIO DOS")
packet_times = get_packet_times_dos(sec=5)
run_scenario(bucket, packet_times)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment